Skip to content

Commit

Permalink
tested extractor on single file and is working
Browse files Browse the repository at this point in the history
  • Loading branch information
Vismayak committed Jan 9, 2025
1 parent 38ed787 commit f42b344
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 9 deletions.
28 changes: 19 additions & 9 deletions pyclowder/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Connector(object):
"""

def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
mounted_paths=None, minio_mounted_path=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
self.extractor_name = extractor_name
self.extractor_info = extractor_info
self.check_message = check_message
Expand All @@ -73,6 +73,10 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m
self.mounted_paths = {}
else:
self.mounted_paths = mounted_paths
if minio_mounted_path is None:
self.minio_mounted_path = ''
else:
self.minio_mounted_path = minio_mounted_path
self.clowder_url = clowder_url
self.clowder_email = clowder_email
self.extractor_key = extractor_key
Expand Down Expand Up @@ -268,8 +272,14 @@ def _build_resource(self, body, host, secret_key, clowder_version):
"metadata": body['metadata']
}

def _check_for_local_file(self, file_metadata):
def _check_for_local_file(self, file_metadata, file_id=None):
""" Try to get pointer to locally accessible copy of file for extractor."""
# Check if file is present in a minio mount (only valid for Clowder v2)
if self.minio_mounted_path and file_id:
minio_file_path = self.minio_mounted_path + "/" + file_id
print("Checking for minio local file: %s" % minio_file_path)
if os.path.isfile(minio_file_path):
return minio_file_path

# first check if file is accessible locally
if 'filepath' in file_metadata:
Expand All @@ -278,7 +288,7 @@ def _check_for_local_file(self, file_metadata):
# first simply check if file is present locally
if os.path.isfile(file_path):
return file_path

# otherwise check any mounted paths...
if len(self.mounted_paths) > 0:
for source_path in self.mounted_paths:
Expand Down Expand Up @@ -427,7 +437,7 @@ def _process_message(self, body):
try:
if check_result != pyclowder.utils.CheckMessage.bypass:
file_metadata = pyclowder.files.download_info(self, host, secret_key, resource["id"])
file_path = self._check_for_local_file(file_metadata)
file_path = self._check_for_local_file(file_metadata, resource["id"])
if not file_path:
file_path = pyclowder.files.download(self, host, secret_key, resource["id"],
resource["intermediate_id"],
Expand Down Expand Up @@ -628,10 +638,10 @@ class RabbitMQConnector(Connector):
# pylint: disable=too-many-arguments
def __init__(self, extractor_name, extractor_info,
rabbitmq_uri, rabbitmq_key=None, rabbitmq_queue=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None,
check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, minio_mounted_path=None,
heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None):
super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email)
ssl_verify, mounted_paths, minio_mounted_path, clowder_url, max_retry, extractor_key, clowder_email)
self.rabbitmq_uri = rabbitmq_uri
self.rabbitmq_key = rabbitmq_key
if rabbitmq_queue is None:
Expand Down Expand Up @@ -756,7 +766,7 @@ def on_message(self, channel, method, header, body):
job_id = None

self.worker = RabbitMQHandler(self.extractor_name, self.extractor_info, job_id, self.check_message,
self.process_message, self.ssl_verify, self.mounted_paths, self.clowder_url,
self.process_message, self.ssl_verify, self.mounted_paths, self.minio_mounted_path, self.clowder_url,
method, header, body)
self.worker.start_thread(json_body)

Expand Down Expand Up @@ -836,10 +846,10 @@ class RabbitMQHandler(Connector):
"""

def __init__(self, extractor_name, extractor_info, job_id, check_message=None, process_message=None, ssl_verify=True,
mounted_paths=None, clowder_url=None, method=None, header=None, body=None, max_retry=10):
mounted_paths=None, minio_mounted_path=None, clowder_url=None, method=None, header=None, body=None, max_retry=10):

super(RabbitMQHandler, self).__init__(extractor_name, extractor_info, check_message, process_message,
ssl_verify, mounted_paths, clowder_url, max_retry)
ssl_verify, mounted_paths, minio_mounted_path,clowder_url, max_retry)
self.method = method
self.header = header
self.body = body
Expand Down
5 changes: 5 additions & 0 deletions pyclowder/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __init__(self):
clowder_email = os.getenv("CLOWDER_EMAIL", "")
logging_config = os.getenv("LOGGING")
mounted_paths = os.getenv("MOUNTED_PATHS", "{}")
minio_mounted_path = os.getenv("MINIO_MOUNTED_PATH", "")
input_file_path = os.getenv("INPUT_FILE_PATH")
output_file_path = os.getenv("OUTPUT_FILE_PATH")
connector_default = "RabbitMQ"
Expand Down Expand Up @@ -105,6 +106,8 @@ def __init__(self):
help='rabbitMQ queue name (default=%s)' % rabbitmq_queuename)
self.parser.add_argument('--mounts', '-m', dest="mounted_paths", default=mounted_paths,
help="dictionary of {'remote path':'local path'} mount mappings")
self.parser.add_argument('--minio-mount', dest="minio_mounted_path", default=minio_mounted_path,
help="path to mount Minio storage")
self.parser.add_argument('--input-file-path', '-ifp', dest="input_file_path", default=input_file_path,
help="Full path to local input file to be processed (used by Big Data feature)")
self.parser.add_argument('--output-file-path', '-ofp', dest="output_file_path", default=output_file_path,
Expand Down Expand Up @@ -175,6 +178,7 @@ def start(self):
rabbitmq_key=rabbitmq_key,
rabbitmq_queue=self.args.rabbitmq_queuename,
mounted_paths=json.loads(self.args.mounted_paths),
minio_mounted_path=self.args.minio_mounted_path,
clowder_url=self.args.clowder_url,
max_retry=self.args.max_retry,
heartbeat=self.args.heartbeat,
Expand All @@ -193,6 +197,7 @@ def start(self):
process_message=self.process_message,
picklefile=self.args.hpc_picklefile,
mounted_paths=json.loads(self.args.mounted_paths),
minio_mounted_path=self.args.minio_mounted_path,
max_retry=self.args.max_retry)
threading.Thread(target=connector.listen, name="HPCConnector").start()

Expand Down

0 comments on commit f42b344

Please sign in to comment.