diff --git a/resources/config.sample.json b/resources/config.sample.json index ff2bb55..1615a09 100644 --- a/resources/config.sample.json +++ b/resources/config.sample.json @@ -1,4 +1,6 @@ { + "nexus_file": "", + "done_writing_message_file": "", "config_file": "", "id": "", "dataset": { @@ -14,7 +16,11 @@ }, "ingestion": { "dry_run": false, - "offline_ingestor_executable": "background_ingestor", + "offline_ingestor_executable": [ + "background_ingestor" + ], + "max_offline_ingestors": 10, + "offline_ingestors_wait_time": 10, "schemas_directory": "schemas", "check_if_dataset_exists_by_pid": true, "check_if_dataset_exists_by_metadata": true, diff --git a/src/scicat_configuration.py b/src/scicat_configuration.py index 5bba866..7d9ee18 100644 --- a/src/scicat_configuration.py +++ b/src/scicat_configuration.py @@ -222,10 +222,18 @@ class FileHandlingOptions: file_path_type: str = "relative" # allowed values: absolute and relative +def default_offline_ingestor_executable() -> list[str]: + return ["background_ingestor"] + + @dataclass(kw_only=True) class IngestionOptions: dry_run: bool = False - offline_ingestor_executable: str = "background_ingestor" + offline_ingestor_executable: list[str] = field( + default_factory=default_offline_ingestor_executable + ) + max_offline_ingestors: int = 10 + offline_ingestors_wait_time: int = 10 schemas_directory: str = "schemas" check_if_dataset_exists_by_pid: bool = True check_if_dataset_exists_by_metadata: bool = True @@ -303,6 +311,8 @@ class OnlineIngestorConfig: # original_dict: Mapping """Original configuration dictionary in the json file.""" + nexus_file: str = "" + done_writing_message_file: str = "" config_file: str id: str = "" dataset: DatasetOptions = field(default_factory=DatasetOptions) diff --git a/src/scicat_online_ingestor.py b/src/scicat_online_ingestor.py index b489d8c..0ac6723 100644 --- a/src/scicat_online_ingestor.py +++ b/src/scicat_online_ingestor.py @@ -6,6 +6,7 @@ import logging import pathlib import subprocess +from time import sleep try: __version__ = importlib.metadata.version(__package__ or __name__) @@ -59,7 +60,14 @@ def dump_message_to_file_if_needed( logger.info("Message file saved") -def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logger): +def _individual_message_commit(job_id, message, consumer, logger: logging.Logger): + logger.info("Executing commit for message with job id %s", job_id) + consumer.commit(message=message) + + +def _check_offline_ingestors( + offline_ingestors, consumer, config, logger: logging.Logger +) -> int: logger.info("%s offline ingestors running", len(offline_ingestors)) for job_id, job_item in offline_ingestors.items(): result = job_item["proc"].poll() @@ -69,8 +77,12 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) if result == 0: logger.info("Offline ingestor successful for job id %s", job_id) - logger.info("Executing commit for message with job id %s", job_id) - consumer.commit(message=job_item["message"]) + # if background process is successful + # check if we need to commit the individual message + if config.kafka.individual_message_commit: + _individual_message_commit( + job_id, job_item["message"], consumer, logger + ) else: logger.error("Offline ingestor error for job id %s", job_id) logger.info( @@ -78,6 +90,8 @@ def _individual_message_commit(offline_ingestors, consumer, logger: logging.Logg ) offline_ingestors.pop(job_id) + return len(offline_ingestors) + def build_online_config(logger: logging.Logger | None = None) -> OnlineIngestorConfig: arg_parser = build_arg_parser( @@ -136,7 +150,7 @@ def main() -> None: # optional depending on the message_saving_options.message_output """ cmd = [ - config.ingestion.offline_ingestor_executable, + *config.ingestion.offline_ingestor_executable, "-c", config.config_file, "--nexus-file", @@ -166,16 +180,28 @@ def main() -> None: if config.ingestion.dry_run: logger.info("Dry run mode enabled. Skipping background ingestor.") else: + logger.info("Checking number of offline ingestor") + offline_ingestor_runnings: int = _check_offline_ingestors( + offline_ingestors, consumer, config, logger + ) + while ( + offline_ingestor_runnings + >= config.ingestion.max_offline_ingestors + ): + sleep(config.ingestion.offline_ingestors_wait_time) + offline_ingestor_runnings = _check_offline_ingestors( + offline_ingestors, consumer, config, logger + ) + + logger.info( + "Offline ingestors currently running %s", + offline_ingestor_runnings, + ) logger.info("Running background ingestor with command above") proc = subprocess.Popen(cmd) # noqa: S603 # save info about the background process offline_ingestors[job_id] = {"proc": proc, "message": message} - # if background process is successful - # check if we need to commit the individual message - if config.kafka.individual_message_commit: - _individual_message_commit(offline_ingestors, consumer, logger) - if __name__ == "__main__": main()