mirror of
https://github.com/searxng/searxng.git
synced 2024-06-02 13:29:22 +00:00
86b4d2f2d0
We have been using a static type checker (pyright) for a long time, but its check was not yet a prerequisite for passing the quality gate. It was checked in the CI, but the error messages were only logged. As is always the case in life, with checks that you have to do but which have no consequences; you neglect them :-) We didn't activate the checks back then because we (even today) have too much monkey patching in our code (not only in the engines, httpx and others objects are also affected). We want to replace monkey patching with clear interfaces for a long time, the basis for this is increased typing and we can only achieve this if we make type checking an integral part of the quality gate. This PR activates the type check; in order to pass the check, a few typings were corrected in the code, but most type inconsistencies were deactivated via inline comments. This was particularly necessary in places where the code uses properties that stick to the objects (monkey patching). The sticking of properties only happens in a few places, but the access to these properties extends over the entire code, which is why there are many `# type: ignore` markers in the code ... which we will hopefully be able to remove again successively in the future. Signed-off-by: Markus Heiser <markus.heiser@darmarit.de>
351 lines
13 KiB
Python
351 lines
13 KiB
Python
# SPDX-License-Identifier: AGPL-3.0-or-later
|
|
# pylint: disable=invalid-name, missing-module-docstring, missing-class-docstring
|
|
|
|
from typing import Any
|
|
from abc import abstractmethod, ABC
|
|
import re
|
|
|
|
from searx import settings
|
|
from searx.sxng_locales import sxng_locales
|
|
from searx.engines import categories, engines, engine_shortcuts
|
|
from searx.external_bang import get_bang_definition_and_autocomplete
|
|
from searx.search import EngineRef
|
|
from searx.webutils import VALID_LANGUAGE_CODE
|
|
|
|
|
|
class QueryPartParser(ABC):
|
|
|
|
__slots__ = "raw_text_query", "enable_autocomplete"
|
|
|
|
@staticmethod
|
|
@abstractmethod
|
|
def check(raw_value) -> Any:
|
|
"""Check if raw_value can be parsed"""
|
|
|
|
def __init__(self, raw_text_query, enable_autocomplete):
|
|
self.raw_text_query = raw_text_query
|
|
self.enable_autocomplete = enable_autocomplete
|
|
|
|
@abstractmethod
|
|
def __call__(self, raw_value) -> Any:
|
|
"""Try to parse raw_value: set the self.raw_text_query properties
|
|
|
|
return True if raw_value has been parsed
|
|
|
|
self.raw_text_query.autocomplete_list is also modified
|
|
if self.enable_autocomplete is True
|
|
"""
|
|
|
|
def _add_autocomplete(self, value):
|
|
if value not in self.raw_text_query.autocomplete_list:
|
|
self.raw_text_query.autocomplete_list.append(value)
|
|
|
|
|
|
class TimeoutParser(QueryPartParser):
|
|
@staticmethod
|
|
def check(raw_value):
|
|
return raw_value[0] == '<'
|
|
|
|
def __call__(self, raw_value):
|
|
value = raw_value[1:]
|
|
found = self._parse(value) if len(value) > 0 else False
|
|
if self.enable_autocomplete and not value:
|
|
self._autocomplete()
|
|
return found
|
|
|
|
def _parse(self, value):
|
|
if not value.isdigit():
|
|
return False
|
|
raw_timeout_limit = int(value)
|
|
if raw_timeout_limit < 100:
|
|
# below 100, the unit is the second ( <3 = 3 seconds timeout )
|
|
self.raw_text_query.timeout_limit = float(raw_timeout_limit)
|
|
else:
|
|
# 100 or above, the unit is the millisecond ( <850 = 850 milliseconds timeout )
|
|
self.raw_text_query.timeout_limit = raw_timeout_limit / 1000.0
|
|
return True
|
|
|
|
def _autocomplete(self):
|
|
for suggestion in ['<3', '<850']:
|
|
self._add_autocomplete(suggestion)
|
|
|
|
|
|
class LanguageParser(QueryPartParser):
|
|
@staticmethod
|
|
def check(raw_value):
|
|
return raw_value[0] == ':'
|
|
|
|
def __call__(self, raw_value):
|
|
value = raw_value[1:].lower().replace('_', '-')
|
|
found = self._parse(value) if len(value) > 0 else False
|
|
if self.enable_autocomplete and not found:
|
|
self._autocomplete(value)
|
|
return found
|
|
|
|
def _parse(self, value):
|
|
found = False
|
|
# check if any language-code is equal with
|
|
# declared language-codes
|
|
for lc in sxng_locales:
|
|
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
|
|
|
|
# if correct language-code is found
|
|
# set it as new search-language
|
|
|
|
if (
|
|
value == lang_id or value == lang_name or value == english_name or value.replace('-', ' ') == country
|
|
) and value not in self.raw_text_query.languages:
|
|
found = True
|
|
lang_parts = lang_id.split('-')
|
|
if len(lang_parts) == 2:
|
|
self.raw_text_query.languages.append(lang_parts[0] + '-' + lang_parts[1].upper())
|
|
else:
|
|
self.raw_text_query.languages.append(lang_id)
|
|
# to ensure best match (first match is not necessarily the best one)
|
|
if value == lang_id:
|
|
break
|
|
|
|
# user may set a valid, yet not selectable language
|
|
if VALID_LANGUAGE_CODE.match(value) or value == 'auto':
|
|
lang_parts = value.split('-')
|
|
if len(lang_parts) > 1:
|
|
value = lang_parts[0].lower() + '-' + lang_parts[1].upper()
|
|
if value not in self.raw_text_query.languages:
|
|
self.raw_text_query.languages.append(value)
|
|
found = True
|
|
|
|
return found
|
|
|
|
def _autocomplete(self, value):
|
|
if not value:
|
|
# show some example queries
|
|
if len(settings['search']['languages']) < 10:
|
|
for lang in settings['search']['languages']:
|
|
self.raw_text_query.autocomplete_list.append(':' + lang)
|
|
else:
|
|
for lang in [":en", ":en_us", ":english", ":united_kingdom"]:
|
|
self.raw_text_query.autocomplete_list.append(lang)
|
|
return
|
|
|
|
for lc in sxng_locales:
|
|
if lc[0] not in settings['search']['languages']:
|
|
continue
|
|
lang_id, lang_name, country, english_name, _flag = map(str.lower, lc)
|
|
|
|
# check if query starts with language-id
|
|
if lang_id.startswith(value):
|
|
if len(value) <= 2:
|
|
self._add_autocomplete(':' + lang_id.split('-')[0])
|
|
else:
|
|
self._add_autocomplete(':' + lang_id)
|
|
|
|
# check if query starts with language name
|
|
if lang_name.startswith(value) or english_name.startswith(value):
|
|
self._add_autocomplete(':' + lang_name)
|
|
|
|
# check if query starts with country
|
|
# here "new_zealand" is "new-zealand" (see __call__)
|
|
if country.startswith(value.replace('-', ' ')):
|
|
self._add_autocomplete(':' + country.replace(' ', '_'))
|
|
|
|
|
|
class ExternalBangParser(QueryPartParser):
|
|
@staticmethod
|
|
def check(raw_value):
|
|
return raw_value.startswith('!!') and len(raw_value) > 2
|
|
|
|
def __call__(self, raw_value):
|
|
value = raw_value[2:]
|
|
found, bang_ac_list = self._parse(value) if len(value) > 0 else (False, [])
|
|
if self.enable_autocomplete:
|
|
self._autocomplete(bang_ac_list)
|
|
return found
|
|
|
|
def _parse(self, value):
|
|
found = False
|
|
bang_definition, bang_ac_list = get_bang_definition_and_autocomplete(value)
|
|
if bang_definition is not None:
|
|
self.raw_text_query.external_bang = value
|
|
found = True
|
|
return found, bang_ac_list
|
|
|
|
def _autocomplete(self, bang_ac_list):
|
|
if not bang_ac_list:
|
|
bang_ac_list = ['g', 'ddg', 'bing']
|
|
for external_bang in bang_ac_list:
|
|
self._add_autocomplete('!!' + external_bang)
|
|
|
|
|
|
class BangParser(QueryPartParser):
|
|
@staticmethod
|
|
def check(raw_value):
|
|
# make sure it's not any bang with double '!!'
|
|
return raw_value[0] == '!' and (len(raw_value) < 2 or raw_value[1] != '!')
|
|
|
|
def __call__(self, raw_value):
|
|
value = raw_value[1:].replace('-', ' ').replace('_', ' ')
|
|
found = self._parse(value) if len(value) > 0 else False
|
|
if found and raw_value[0] == '!':
|
|
self.raw_text_query.specific = True
|
|
if self.enable_autocomplete:
|
|
self._autocomplete(raw_value[0], value)
|
|
return found
|
|
|
|
def _parse(self, value):
|
|
# check if prefix is equal with engine shortcut
|
|
if value in engine_shortcuts: # pylint: disable=consider-using-get
|
|
value = engine_shortcuts[value]
|
|
|
|
# check if prefix is equal with engine name
|
|
if value in engines:
|
|
self.raw_text_query.enginerefs.append(EngineRef(value, 'none'))
|
|
return True
|
|
|
|
# check if prefix is equal with category name
|
|
if value in categories:
|
|
# using all engines for that search, which
|
|
# are declared under that category name
|
|
self.raw_text_query.enginerefs.extend(
|
|
EngineRef(engine.name, value)
|
|
for engine in categories[value]
|
|
if (engine.name, value) not in self.raw_text_query.disabled_engines
|
|
)
|
|
return True
|
|
|
|
return False
|
|
|
|
def _autocomplete(self, first_char, value):
|
|
if not value:
|
|
# show some example queries
|
|
for suggestion in ['images', 'wikipedia', 'osm']:
|
|
if suggestion not in self.raw_text_query.disabled_engines or suggestion in categories:
|
|
self._add_autocomplete(first_char + suggestion)
|
|
return
|
|
|
|
# check if query starts with category name
|
|
for category in categories:
|
|
if category.startswith(value):
|
|
self._add_autocomplete(first_char + category.replace(' ', '_'))
|
|
|
|
# check if query starts with engine name
|
|
for engine in engines:
|
|
if engine.startswith(value):
|
|
self._add_autocomplete(first_char + engine.replace(' ', '_'))
|
|
|
|
# check if query starts with engine shortcut
|
|
for engine_shortcut in engine_shortcuts:
|
|
if engine_shortcut.startswith(value):
|
|
self._add_autocomplete(first_char + engine_shortcut)
|
|
|
|
|
|
class FeelingLuckyParser(QueryPartParser):
|
|
@staticmethod
|
|
def check(raw_value):
|
|
return raw_value == '!!'
|
|
|
|
def __call__(self, raw_value):
|
|
self.raw_text_query.redirect_to_first_result = True
|
|
return True
|
|
|
|
|
|
class RawTextQuery:
|
|
"""parse raw text query (the value from the html input)"""
|
|
|
|
PARSER_CLASSES = [
|
|
TimeoutParser, # force the timeout
|
|
LanguageParser, # force a language
|
|
ExternalBangParser, # external bang (must be before BangParser)
|
|
BangParser, # force an engine or category
|
|
FeelingLuckyParser, # redirect to the first link in the results list
|
|
]
|
|
|
|
def __init__(self, query, disabled_engines):
|
|
assert isinstance(query, str)
|
|
# input parameters
|
|
self.query = query
|
|
self.disabled_engines = disabled_engines if disabled_engines else []
|
|
# parsed values
|
|
self.enginerefs = []
|
|
self.languages = []
|
|
self.timeout_limit = None
|
|
self.external_bang = None
|
|
self.specific = False
|
|
self.autocomplete_list = []
|
|
# internal properties
|
|
self.query_parts = [] # use self.getFullQuery()
|
|
self.user_query_parts = [] # use self.getQuery()
|
|
self.autocomplete_location = None
|
|
self.redirect_to_first_result = False
|
|
self._parse_query()
|
|
|
|
def _parse_query(self):
|
|
"""
|
|
parse self.query, if tags are set, which
|
|
change the search engine or search-language
|
|
"""
|
|
|
|
# split query, including whitespaces
|
|
raw_query_parts = re.split(r'(\s+)', self.query)
|
|
|
|
last_index_location = None
|
|
autocomplete_index = len(raw_query_parts) - 1
|
|
|
|
for i, query_part in enumerate(raw_query_parts):
|
|
# part does only contain spaces, skip
|
|
if query_part.isspace() or query_part == '':
|
|
continue
|
|
|
|
# parse special commands
|
|
special_part = False
|
|
for parser_class in RawTextQuery.PARSER_CLASSES:
|
|
if parser_class.check(query_part):
|
|
special_part = parser_class(self, i == autocomplete_index)(query_part)
|
|
break
|
|
|
|
# append query part to query_part list
|
|
qlist = self.query_parts if special_part else self.user_query_parts
|
|
qlist.append(query_part)
|
|
last_index_location = (qlist, len(qlist) - 1)
|
|
|
|
self.autocomplete_location = last_index_location
|
|
|
|
def get_autocomplete_full_query(self, text):
|
|
qlist, position = self.autocomplete_location # type: ignore
|
|
qlist[position] = text
|
|
return self.getFullQuery()
|
|
|
|
def changeQuery(self, query):
|
|
self.user_query_parts = query.strip().split()
|
|
self.query = self.getFullQuery()
|
|
self.autocomplete_location = (self.user_query_parts, len(self.user_query_parts) - 1)
|
|
self.autocomplete_list = []
|
|
return self
|
|
|
|
def getQuery(self):
|
|
return ' '.join(self.user_query_parts)
|
|
|
|
def getFullQuery(self):
|
|
"""
|
|
get full query including whitespaces
|
|
"""
|
|
return '{0} {1}'.format(' '.join(self.query_parts), self.getQuery()).strip()
|
|
|
|
def __str__(self):
|
|
return self.getFullQuery()
|
|
|
|
def __repr__(self):
|
|
return (
|
|
f"<{self.__class__.__name__} "
|
|
+ f"query={self.query!r} "
|
|
+ f"disabled_engines={self.disabled_engines!r}\n "
|
|
+ f"languages={self.languages!r} "
|
|
+ f"timeout_limit={self.timeout_limit!r} "
|
|
+ f"external_bang={self.external_bang!r} "
|
|
+ f"specific={self.specific!r} "
|
|
+ f"enginerefs={self.enginerefs!r}\n "
|
|
+ f"autocomplete_list={self.autocomplete_list!r}\n "
|
|
+ f"query_parts={self.query_parts!r}\n "
|
|
+ f"user_query_parts={self.user_query_parts!r} >\n"
|
|
+ f"redirect_to_first_result={self.redirect_to_first_result!r}"
|
|
)
|