diff --git a/suzieq/poller/controller/controller.py b/suzieq/poller/controller/controller.py index 2c7295b6dd..e2b8c8a5d0 100644 --- a/suzieq/poller/controller/controller.py +++ b/suzieq/poller/controller/controller.py @@ -1,12 +1,11 @@ -"""This module manages and coordinates all the plugins +""" +This module manages and coordinates all the plugins. Classes: - InventoryProvider: manages all the plugins + Controller: Manages all the plugins and coordinates their execution. Functions: - sq_main(): this function coordinates all the plugins initializing - and starting each one of them. The plugins are divided - in types. + sq_main(): Coordinates the initialization and startup of all plugins, which are divided by type. """ import argparse import asyncio @@ -18,8 +17,7 @@ from copy import deepcopy from suzieq.poller.controller.base_controller_plugin import ControllerPlugin -from suzieq.poller.controller.inventory_async_plugin import \ - InventoryAsyncPlugin +from suzieq.poller.controller.inventory_async_plugin import InventoryAsyncPlugin from suzieq.poller.worker.services.service_manager import ServiceManager from suzieq.shared.exceptions import InventorySourceError, SqPollerConfError from suzieq.shared.utils import sq_get_config_file @@ -30,41 +28,29 @@ class Controller: - """This class manages all the plugins set on the configuration files - """ + """Manages and coordinates all plugins specified in the configuration files.""" def __init__(self, args: argparse.Namespace, config_data: dict) -> None: - # contains the Plugin objects divided by type + # Container for plugin objects, divided by type self._plugin_objects = {} - # collect basePlugin classes + # Collect basePlugin classes self._base_plugin_classes = ControllerPlugin.get_plugins() + # Initialize the core components self.sources = [] self.chunker = None self.manager = None - - # Initialize configurations self._config = defaultdict(lambda: {}) self._config.update(config_data.get('poller', {})) - # Set controller configuration - # single_run_mode: ['gather', 'process', 'update', 'debug', - # 'input-dir'] - # tells if the poller should not run forever - # period: the update timeout of the inventory - # input-dir: wether to use a directory with some data as input - # no-coalescer: wether to use the coalescer or not, the coalescer - # is always disabled with run-once - # inventory_timeout: the maximum amount of time to wait for an - # inventory from a source + # Controller configuration self._single_run_mode = args.run_once - self._input_dir = args.input_dir if self._input_dir: self._single_run_mode = 'input-dir' - # If the debug mode is active we need to run the controller only once + # Debug mode adjusts the run behavior if args.debug: self._single_run_mode = 'debug' @@ -72,16 +58,20 @@ def __init__(self, args: argparse.Namespace, config_data: dict) -> None: if self._single_run_mode: self._no_coalescer = True - self._period = args.update_period or \ - self._config.get('update-period', 3600) + # Set polling period and timeouts + self._period = args.update_period or self._config.get('update-period', 3600) self._inventory_timeout = self._config.get('inventory-timeout', 10) - self._n_workers = args.workers or self._config.get( - 'manager', {}).get('workers', 1) + self._n_workers = args.workers or self._config.get('manager', {}).get('workers', 1) - # Validate the arguments + # Validate configuration arguments self._validate_controller_args(args, config_data) - # Get the inventory + # Setup inventory and manager configurations + self._setup_inventory(args) + self._setup_manager(args) + + def _setup_inventory(self, args: argparse.Namespace): + """Sets up the inventory configuration and validates its existence.""" default_inventory_file = DEFAULT_INVENTORY_PATH inventory_file = None @@ -90,215 +80,126 @@ def __init__(self, args: argparse.Namespace, config_data: dict) -> None: self._config.get('inventory-file') or \ default_inventory_file if not Path(inventory_file).is_file(): - if inventory_file != default_inventory_file: - raise SqPollerConfError( - f'Inventory file not found at {inventory_file}' - ) - raise SqPollerConfError( - 'Inventory file not found in the default location:' - f'{inventory_file}, use -I argument to provide it, ' - 'use -i instead to provide an input directory ' - 'with pre-captured output and simulate an input.' + f'Inventory file not found at {inventory_file}. Use -I argument to specify one.' ) else: if not Path(self._input_dir).is_dir(): - raise SqPollerConfError( - f'{self._input_dir} is not a valid directory' - ) + raise SqPollerConfError(f'{self._input_dir} is not a valid directory') + + def _setup_manager(self, args: argparse.Namespace): + """Sets up manager configuration based on command-line arguments and config data.""" + source_args = { + 'single-run-mode': self._single_run_mode, + 'path': DEFAULT_INVENTORY_PATH + } + + manager_args = { + 'config': sq_get_config_file(args.config), + 'config-dict': self._config, + 'debug': args.debug, + 'input-dir': self._input_dir, + 'exclude-services': args.exclude_services, + 'no-coalescer': self._no_coalescer, + 'output-dir': args.output_dir, + 'outputs': args.outputs, + 'max-cmd-pipeline': self._config.get('max-cmd-pipeline', 0), + 'single-run-mode': self._single_run_mode, + 'run-once': args.run_once, + 'service-only': args.service_only, + 'ssh-config-file': args.ssh_config_file, + 'workers': self._n_workers + } - # Get the maximum number of commands per second - max_cmd_pipeline = self._config.get('max-cmd-pipeline', 0) - if ((max_cmd_pipeline != 0) and - (max_cmd_pipeline % self._n_workers != 0)): - raise SqPollerConfError( - f'max-cmd-pipeline ({max_cmd_pipeline}) has to be a ' - f'multiple of the number of worker ({self._n_workers})') - - source_args = {'single-run-mode': self._single_run_mode, - 'path': inventory_file} - - manager_args = {'config': sq_get_config_file(args.config), - 'config-dict': config_data, - 'debug': args.debug, - 'input-dir': self._input_dir, - 'exclude-services': args.exclude_services, - 'no-coalescer': self._no_coalescer, - 'output-dir': args.output_dir, - 'outputs': args.outputs, - 'max-cmd-pipeline': max_cmd_pipeline, - # `single-run-mode` and `run-once` are different. - # The former is an internal variable telling the - # poller if it should run and terminate, the other - # is a special run mode for the worker. - 'single-run-mode': self._single_run_mode, - 'run-once': args.run_once, - 'service-only': args.service_only, - 'ssh-config-file': args.ssh_config_file, - 'workers': self._n_workers - } - - # Update configuration with command arguments self._config['source'].update(source_args) - self._config['manager'].update(manager_args) - if not self._config['manager'].get('type'): - self._config['manager']['type'] = 'static' - - if not self._config['chunker'].get('type'): - self._config['chunker']['type'] = 'static' - - @property - def single_run_mode(self) -> str: - """Returns the current single-run mode if any, if the poller should - run forever this function returns None - - Returns: - [str]: current single-run mode - """ - return self._single_run_mode - @property - def period(self) -> int: - """Returns the update period of the inventory + def _validate_controller_args(self, args: argparse.Namespace, cfg: Dict): + """Validates the controller arguments to ensure correct configuration.""" + if self._inventory_timeout < 1: + raise SqPollerConfError('Invalid inventory timeout: at least 1 second is required') - Returns: - [int]: update period in seconds - """ - return self._period + if self._period < 1: + raise SqPollerConfError('Invalid period: at least 1 second is required') - @property - def inventory_timeout(self) -> int: - """Returns the maximum amount of time to wait for an inventory source - retrieving its device list. + if self._n_workers < 1: + raise SqPollerConfError('At least one worker is required') - Returns: - int: inventory timeout in secodns - """ - return self._inventory_timeout + ServiceManager.get_service_list( + args.service_only, + args.exclude_services, + cfg['service-directory'] + ) def init(self): - """Loads the provider configuration and the plugins configurations - and initialize all the plugins - """ - - # Initialize the controller modules - logger.info('Initializing all the poller controller modules') + """Initializes the controller, loading plugins and their configurations.""" + logger.info('Initializing poller controller modules') - # If the input is a directory sources and chunkers are not needed - # since there is no need to build and split the inventory + # Initialize sources and chunkers if not using an input directory if not self._input_dir: - logger.debug('Inizialing sources') - + logger.debug('Initializing sources') self.sources = self.init_plugins('source') if not self.sources: - raise SqPollerConfError( - "The inventory file doesn't have any source" - ) - - # Initialize chunker module - logger.debug('Initialize chunker module') + raise SqPollerConfError('No source found in the inventory') + logger.debug('Initializing chunker') chunkers = self.init_plugins('chunker') if len(chunkers) > 1: - raise SqPollerConfError( - 'Only 1 Chunker at a time is supported' - ) + raise SqPollerConfError('Only one chunker at a time is supported') self.chunker = chunkers[0] - # initialize pollerManager - logger.debug('Initialize manager module') - + logger.debug('Initializing manager') managers = self.init_plugins('manager') if len(managers) > 1: - raise SqPollerConfError( - 'Only 1 manager at a time is supported' - ) + raise SqPollerConfError('Only one manager at a time is supported') self.manager = managers[0] def init_plugins(self, plugin_type: str) -> List[ControllerPlugin]: - """Initialize the controller plugins of type according - to the controller configuration - - Args: - plugin_type (str): type of plugins to initialize - - Raises: - SqPollerConfError: raised if a wrong configuration is passed - - Returns: - List[ControllerPlugin]: list of initialized plugins - """ + """Initializes the plugins of the specified type from the configuration.""" plugin_conf = self._config.get(plugin_type) or {} base_plugin_class = self._base_plugin_classes.get(plugin_type, None) if not base_plugin_class: - raise SqPollerConfError(f'Unknown plugin type {plugin_type}') + raise SqPollerConfError(f'Unknown plugin type: {plugin_type}') - # Initialize all the instances of the given plugin - self._plugin_objects[plugin_type] = base_plugin_class.init_plugins( - plugin_conf) + # Initialize and return plugins + self._plugin_objects[plugin_type] = base_plugin_class.init_plugins(plugin_conf) return self._plugin_objects[plugin_type] async def run(self): - """Start the device polling phase. - - In the kwargs are passed all the components that must be started - - Args: - controller (Controller): contains the informations for controller - and workers - """ - # When the poller receives a termination signal, we would like - # to gracefully terminate all the tasks + """Starts the device polling phase and manages the lifecycle of tasks.""" loop = asyncio.get_event_loop() for s in [signal.SIGTERM, signal.SIGINT]: - loop.add_signal_handler( - s, lambda s=s: asyncio.create_task(self._stop())) - - # Start collecting the tasks to launch - source_tasks = [s.run() for s in self.sources - if isinstance(s, InventoryAsyncPlugin)] - # Check if we need to launch the manager in background - manager_tasks = [] - if isinstance(self.manager, InventoryAsyncPlugin): - manager_tasks.append(self.manager.run()) - - # Append the synchronization manager + loop.add_signal_handler(s, lambda s=s: asyncio.create_task(self._stop())) + + # Prepare tasks for execution + source_tasks = [s.run() for s in self.sources if isinstance(s, InventoryAsyncPlugin)] + manager_tasks = [self.manager.run()] if isinstance(self.manager, InventoryAsyncPlugin) else [] controller_task = [self._inventory_sync()] - # Launch all the tasks - tasks = [asyncio.create_task(t) - for t in (source_tasks + manager_tasks + controller_task)] + # Launch all tasks asynchronously + tasks = [asyncio.create_task(t) for t in (source_tasks + manager_tasks + controller_task)] try: while tasks: - done, pending = await asyncio.wait( - tasks, - return_when=asyncio.FIRST_COMPLETED - ) - + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) tasks = list(pending) for task in done: if task.exception(): raise task.exception() - # Ignore completed task if started with single-run mode + if self._single_run_mode: continue - except asyncio.CancelledError: logger.warning('Received termination signal, terminating...') async def _stop(self): - """Stop the controller""" - - tasks = [t for t in asyncio.all_tasks() - if t is not asyncio.current_task()] - + """Gracefully stops all running tasks.""" + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] for task in tasks: task.cancel() async def _inventory_sync(self): - # With the input directory we do not launch the synchronization loop + """Synchronizes the inventory and distributes it among workers.""" if self._input_dir: await self.manager.launch_with_dir() return @@ -313,10 +214,7 @@ async def _inventory_sync(self): self._inventory_timeout )) except asyncio.TimeoutError: - raise InventorySourceError( - f'Timeout error: source {inv_src.name} took' - 'too much time' - ) + raise InventorySourceError(f'Timeout error: source {inv_src.name} took too long') logger.debug(f'Received inventory from {inv_src.name}') if cur_inv: