# SPDX-License-Identifier: AGPL-3.0-or-later """With *command engines* administrators can run engines to integrate arbitrary shell commands. .. attention:: When creating and enabling a ``command`` engine on a public instance, you must be careful to avoid leaking private data. The easiest solution is to limit the access by setting ``tokens`` as described in section :ref:`private engines`. The engine base is flexible. Only your imagination can limit the power of this engine (and maybe security concerns). Configuration ============= The following options are available: ``command``: A comma separated list of the elements of the command. A special token ``{{QUERY}}`` tells where to put the search terms of the user. Example: .. code:: yaml ['ls', '-l', '-h', '{{QUERY}}'] ``delimiter``: A mapping containing a delimiter ``char`` and the *titles* of each element in ``keys``. ``parse_regex``: A dict containing the regular expressions for each result key. ``query_type``: The expected type of user search terms. Possible values: ``path`` and ``enum``. ``path``: Checks if the user provided path is inside the working directory. If not, the query is not executed. ``enum``: Is a list of allowed search terms. If the user submits something which is not included in the list, the query returns an error. ``query_enum``: A list containing allowed search terms if ``query_type`` is set to ``enum``. ``working_dir``: The directory where the command has to be executed. Default: ``./``. ``result_separator``: The character that separates results. Default: ``\\n``. Example ======= The example engine below can be used to find files with a specific name in the configured working directory: .. code:: yaml - name: find engine: command command: ['find', '.', '-name', '{{QUERY}}'] query_type: path shortcut: fnd delimiter: chars: ' ' keys: ['line'] Implementations =============== """ import re from os.path import expanduser, isabs, realpath, commonprefix from shlex import split as shlex_split from subprocess import Popen, PIPE from threading import Thread from searx import logger engine_type = 'offline' paging = True command = [] delimiter = {} parse_regex = {} query_type = '' query_enum = [] environment_variables = {} working_dir = realpath('.') result_separator = '\n' result_template = 'key-value.html' timeout = 4.0 _command_logger = logger.getChild('command') _compiled_parse_regex = {} def init(engine_settings): check_parsing_options(engine_settings) if 'command' not in engine_settings: raise ValueError('engine command : missing configuration key: command') global command, working_dir, delimiter, parse_regex, environment_variables # pylint: disable=global-statement command = engine_settings['command'] if 'working_dir' in engine_settings: working_dir = engine_settings['working_dir'] if not isabs(engine_settings['working_dir']): working_dir = realpath(working_dir) if 'parse_regex' in engine_settings: parse_regex = engine_settings['parse_regex'] for result_key, regex in parse_regex.items(): _compiled_parse_regex[result_key] = re.compile(regex, flags=re.MULTILINE) if 'delimiter' in engine_settings: delimiter = engine_settings['delimiter'] if 'environment_variables' in engine_settings: environment_variables = engine_settings['environment_variables'] def search(query, params): cmd = _get_command_to_run(query) if not cmd: return [] results = [] reader_thread = Thread(target=_get_results_from_process, args=(results, cmd, params['pageno'])) reader_thread.start() reader_thread.join(timeout=timeout) return results def _get_command_to_run(query): params = shlex_split(query) __check_query_params(params) cmd = [] for c in command: if c == '{{QUERY}}': cmd.extend(params) else: cmd.append(c) return cmd def _get_results_from_process(results, cmd, pageno): leftover = '' count = 0 start, end = __get_results_limits(pageno) with Popen(cmd, stdout=PIPE, stderr=PIPE, env=environment_variables) as process: line = process.stdout.readline() while line: buf = leftover + line.decode('utf-8') raw_results = buf.split(result_separator) if raw_results[-1]: leftover = raw_results[-1] raw_results = raw_results[:-1] for raw_result in raw_results: result = __parse_single_result(raw_result) if result is None: _command_logger.debug('skipped result:', raw_result) continue if start <= count and count <= end: # pylint: disable=chained-comparison result['template'] = result_template results.append(result) count += 1 if end < count: return results line = process.stdout.readline() return_code = process.wait(timeout=timeout) if return_code != 0: raise RuntimeError('non-zero return code when running command', cmd, return_code) return None def __get_results_limits(pageno): start = (pageno - 1) * 10 end = start + 9 return start, end def __check_query_params(params): if not query_type: return if query_type == 'path': query_path = params[-1] query_path = expanduser(query_path) if commonprefix([realpath(query_path), working_dir]) != working_dir: raise ValueError('requested path is outside of configured working directory') elif query_type == 'enum' and len(query_enum) > 0: for param in params: if param not in query_enum: raise ValueError('submitted query params is not allowed', param, 'allowed params:', query_enum) def check_parsing_options(engine_settings): """Checks if delimiter based parsing or regex parsing is configured correctly""" if 'delimiter' not in engine_settings and 'parse_regex' not in engine_settings: raise ValueError('failed to init settings for parsing lines: missing delimiter or parse_regex') if 'delimiter' in engine_settings and 'parse_regex' in engine_settings: raise ValueError('failed to init settings for parsing lines: too many settings') if 'delimiter' in engine_settings: if 'chars' not in engine_settings['delimiter'] or 'keys' not in engine_settings['delimiter']: raise ValueError def __parse_single_result(raw_result): """Parses command line output based on configuration""" result = {} if delimiter: elements = raw_result.split(delimiter['chars'], maxsplit=len(delimiter['keys']) - 1) if len(elements) != len(delimiter['keys']): return {} for i in range(len(elements)): # pylint: disable=consider-using-enumerate result[delimiter['keys'][i]] = elements[i] if parse_regex: for result_key, regex in _compiled_parse_regex.items(): found = regex.search(raw_result) if not found: return {} result[result_key] = raw_result[found.start() : found.end()] return result