From f42b3443f4fdb5ac938772eeea16af1372b1b762 Mon Sep 17 00:00:00 2001 From: vismayak Date: Thu, 9 Jan 2025 14:57:37 -0600 Subject: [PATCH] tested extractor on single file and is working --- pyclowder/connectors.py | 28 +++++++++++++++++++--------- pyclowder/extractors.py | 5 +++++ 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 14543be..eea9990 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -63,7 +63,7 @@ class Connector(object): """ def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -73,6 +73,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m self.mounted_paths = {} else: self.mounted_paths = mounted_paths + if minio_mounted_path is None: + self.minio_mounted_path = '' + else: + self.minio_mounted_path = minio_mounted_path self.clowder_url = clowder_url self.clowder_email = clowder_email self.extractor_key = extractor_key @@ -268,8 +272,14 @@ def _build_resource(self, body, host, secret_key, clowder_version): "metadata": body['metadata'] } - def _check_for_local_file(self, file_metadata): + def _check_for_local_file(self, file_metadata, file_id=None): """ Try to get pointer to locally accessible copy of file for extractor.""" + # Check if file is present in a minio mount (only valid for Clowder v2) + if self.minio_mounted_path and file_id: + minio_file_path = self.minio_mounted_path + "/" + file_id + print("Checking for minio local file: %s" % minio_file_path) + if os.path.isfile(minio_file_path): + return minio_file_path # first check if file is accessible locally if 'filepath' in file_metadata: @@ -278,7 +288,7 @@ def _check_for_local_file(self, file_metadata): # first simply check if file is present locally if os.path.isfile(file_path): return file_path - + # otherwise check any mounted paths... if len(self.mounted_paths) > 0: for source_path in self.mounted_paths: @@ -427,7 +437,7 @@ def _process_message(self, body): try: if check_result != pyclowder.utils.CheckMessage.bypass: file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"]) - file_path = self._check_for_local_file(file_metadata) + file_path = self._check_for_local_file(file_metadata, resource["id"]) if not file_path: file_path = pyclowder.files.download(self, host, secret_key, resource["id"], resource["intermediate_id"], @@ -628,10 +638,10 @@ class RabbitMQConnector(Connector): # pylint: disable=too-many-arguments def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None, - check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, + check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, minio_mounted_path=None, heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) + ssl_verify, mounted_paths, minio_mounted_path, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_key = rabbitmq_key if rabbitmq_queue is None: @@ -756,7 +766,7 @@ def on_message(self, channel, method, header, body): job_id = None self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message, - self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url, + self.process_message, self.ssl_verify, self.mounted_paths, self.minio_mounted_path, self.clowder_url, method, header, body) self.worker.start_thread(json_body) @@ -836,10 +846,10 @@ class RabbitMQHandler(Connector): """ def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): + mounted_paths=None, minio_mounted_path=None, clowder_url=None, method=None, header=None, body=None, max_retry=10): super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, minio_mounted_path,clowder_url, max_retry) self.method = method self.header = header self.body = body diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 60a3a87..ee1b480 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -72,6 +72,7 @@ def __init__(self): clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") + minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "") input_file_path = os.getenv("INPUT_FILE_PATH") output_file_path = os.getenv("OUTPUT_FILE_PATH") connector_default = "RabbitMQ" @@ -105,6 +106,8 @@ def __init__(self): help='rabbitMQ queue name (default=%s)' % rabbitmq_queuename) self.parser.add_argument('--mounts', '-m', dest="mounted_paths", default=mounted_paths, help="dictionary of {'remote path':'local path'} mount mappings") + self.parser.add_argument('--minio-mount', dest="minio_mounted_path", default=minio_mounted_path, + help="path to mount Minio storage") self.parser.add_argument('--input-file-path', '-ifp', dest="input_file_path", default=input_file_path, help="Full path to local input file to be processed (used by Big Data feature)") self.parser.add_argument('--output-file-path', '-ofp', dest="output_file_path", default=output_file_path, @@ -175,6 +178,7 @@ def start(self): rabbitmq_key=rabbitmq_key, rabbitmq_queue=self.args.rabbitmq_queuename, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, heartbeat=self.args.heartbeat, @@ -193,6 +197,7 @@ def start(self): process_message=self.process_message, picklefile=self.args.hpc_picklefile, mounted_paths=json.loads(self.args.mounted_paths), + minio_mounted_path=self.args.minio_mounted_path, max_retry=self.args.max_retry) threading.Thread(target=connector.listen, name="HPCConnector").start()