# SPDX-License-Identifier: AGPL-3.0-or-later # lint: pylint # pylint: disable=missing-module-docstring, missing-class-docstring import sys from hashlib import sha256 from importlib import import_module from os import listdir, makedirs, remove, stat, utime from os.path import abspath, basename, dirname, exists, join from shutil import copyfile from pkgutil import iter_modules from logging import getLogger from typing import List, Tuple from searx import logger, settings class Plugin: # pylint: disable=too-few-public-methods """This class is currently never initialized and only used for type hinting.""" id: str name: str description: str default_on: bool js_dependencies: Tuple[str] css_dependencies: Tuple[str] preference_section: str logger = logger.getChild("plugins") required_attrs = ( # fmt: off ("name", str), ("description", str), ("default_on", bool) # fmt: on ) optional_attrs = ( # fmt: off ("js_dependencies", tuple), ("css_dependencies", tuple), ("preference_section", str), # fmt: on ) def sha_sum(filename): with open(filename, "rb") as f: file_content_bytes = f.read() return sha256(file_content_bytes).hexdigest() def sync_resource(base_path, resource_path, name, target_dir, plugin_dir): dep_path = join(base_path, resource_path) file_name = basename(dep_path) resource_path = join(target_dir, file_name) if not exists(resource_path) or sha_sum(dep_path) != sha_sum(resource_path): try: copyfile(dep_path, resource_path) # copy atime_ns and mtime_ns, so the weak ETags (generated by # the HTTP server) do not change dep_stat = stat(dep_path) utime(resource_path, ns=(dep_stat.st_atime_ns, dep_stat.st_mtime_ns)) except IOError: logger.critical("failed to copy plugin resource {0} for plugin {1}".format(file_name, name)) sys.exit(3) # returning with the web path of the resource return join("plugins/external_plugins", plugin_dir, file_name) def prepare_package_resources(plugin, plugin_module_name): plugin_base_path = dirname(abspath(plugin.__file__)) plugin_dir = plugin_module_name target_dir = join(settings["ui"]["static_path"], "plugins/external_plugins", plugin_dir) try: makedirs(target_dir, exist_ok=True) except IOError: logger.critical("failed to create resource directory {0} for plugin {1}".format(target_dir, plugin_module_name)) sys.exit(3) resources = [] if hasattr(plugin, "js_dependencies"): resources.extend(map(basename, plugin.js_dependencies)) plugin.js_dependencies = [ sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir) for x in plugin.js_dependencies ] if hasattr(plugin, "css_dependencies"): resources.extend(map(basename, plugin.css_dependencies)) plugin.css_dependencies = [ sync_resource(plugin_base_path, x, plugin_module_name, target_dir, plugin_dir) for x in plugin.css_dependencies ] for f in listdir(target_dir): if basename(f) not in resources: resource_path = join(target_dir, basename(f)) try: remove(resource_path) except IOError: logger.critical( "failed to remove unused resource file {0} for plugin {1}".format(resource_path, plugin_module_name) ) sys.exit(3) def load_plugin(plugin_module_name, external): # pylint: disable=too-many-branches try: plugin = import_module(plugin_module_name) except ( SyntaxError, KeyboardInterrupt, SystemExit, SystemError, ImportError, RuntimeError, ) as e: logger.critical("%s: fatal exception", plugin_module_name, exc_info=e) sys.exit(3) except BaseException: logger.exception("%s: exception while loading, the plugin is disabled", plugin_module_name) return None # difference with searx: use module name instead of the user name plugin.id = plugin_module_name # plugin.logger = getLogger(plugin_module_name) for plugin_attr, plugin_attr_type in required_attrs: if not hasattr(plugin, plugin_attr): logger.critical('%s: missing attribute "%s", cannot load plugin', plugin, plugin_attr) sys.exit(3) attr = getattr(plugin, plugin_attr) if not isinstance(attr, plugin_attr_type): type_attr = str(type(attr)) logger.critical( '{1}: attribute "{0}" is of type {2}, must be of type {3}, cannot load plugin'.format( plugin, plugin_attr, type_attr, plugin_attr_type ) ) sys.exit(3) for plugin_attr, plugin_attr_type in optional_attrs: if not hasattr(plugin, plugin_attr) or not isinstance(getattr(plugin, plugin_attr), plugin_attr_type): setattr(plugin, plugin_attr, plugin_attr_type()) if not hasattr(plugin, "preference_section"): plugin.preference_section = "general" # query plugin if plugin.preference_section == "query": for plugin_attr in ("query_keywords", "query_examples"): if not hasattr(plugin, plugin_attr): logger.critical('missing attribute "{0}", cannot load plugin: {1}'.format(plugin_attr, plugin)) sys.exit(3) if settings.get("enabled_plugins"): # searx compatibility: plugin.name in settings['enabled_plugins'] plugin.default_on = plugin.name in settings["enabled_plugins"] or plugin.id in settings["enabled_plugins"] # copy ressources if this is an external plugin if external: prepare_package_resources(plugin, plugin_module_name) logger.debug("%s: loaded", plugin_module_name) return plugin def load_and_initialize_plugin(plugin_module_name, external, init_args): plugin = load_plugin(plugin_module_name, external) if plugin and hasattr(plugin, 'init'): try: return plugin if plugin.init(*init_args) else None except Exception: # pylint: disable=broad-except plugin.logger.exception("Exception while calling init, the plugin is disabled") return None return plugin class PluginStore: def __init__(self): self.plugins: List[Plugin] = [] def __iter__(self): for plugin in self.plugins: yield plugin def register(self, plugin): self.plugins.append(plugin) def call(self, ordered_plugin_list, plugin_type, *args, **kwargs): ret = True for plugin in ordered_plugin_list: if hasattr(plugin, plugin_type): try: ret = getattr(plugin, plugin_type)(*args, **kwargs) if not ret: break except Exception: # pylint: disable=broad-except plugin.logger.exception("Exception while calling %s", plugin_type) return ret plugins = PluginStore() def plugin_module_names(): yield_plugins = set() # embedded plugins for module in iter_modules(path=[dirname(__file__)]): yield (__name__ + "." + module.name, False) yield_plugins.add(module.name) # external plugins for module_name in settings['plugins']: if module_name not in yield_plugins: yield (module_name, True) yield_plugins.add(module_name) def initialize(app): for module_name, external in plugin_module_names(): plugin = load_and_initialize_plugin(module_name, external, (app, settings)) if plugin: plugins.register(plugin)