From 822d923c5a630afb3e6764cf615ce6899130c2d1 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 17 Sep 2021 09:38:13 -0500 Subject: [PATCH 01/12] add extractor_key param --- pyclowder/connectors.py | 22 ++++++++++++++++------ pyclowder/extractors.py | 9 ++++++++- 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 3e0fb9b..5881458 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -66,7 +66,7 @@ class Connector(object): registered_clowder = list() def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10): + mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -77,6 +77,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m else: self.mounted_paths = mounted_paths self.clowder_url = clowder_url + self.extractor_key = extractor_key self.max_retry = max_retry filename = 'notifications.json' @@ -391,10 +392,16 @@ def _process_message(self, body): return # register extractor - url = "%sapi/extractors" % source_host - if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) - self.register_extractor("%s?key=%s" % (url, secret_key)) + if self.extractor_key is None: + url = "%sapi/extractors" % source_host + if url not in Connector.registered_clowder: + Connector.registered_clowder.append(url) + self.register_extractor("%s?key=%s" % (url, secret_key)) + else: + url = "%sapi/extractors/private/%s" % (source_host, self.extractor_key) + if url not in Connector.registered_clowder: + Connector.registered_clowder.append(url) + self.register_extractor("%s?key=%s" % (url, secret_key)) # tell everybody we are starting to process the file self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.") @@ -630,7 +637,7 @@ class RabbitMQConnector(Connector): def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, - heartbeat=5*60, clowder_url=None, max_retry=10): + heartbeat=5*60, clowder_url=None, max_retry=10, extractor_key=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, ssl_verify, mounted_paths, clowder_url, max_retry) self.rabbitmq_uri = rabbitmq_uri @@ -640,6 +647,9 @@ def __init__(self, extractor_name, extractor_info, self.rabbitmq_queue = extractor_info['name'] else: self.rabbitmq_queue = rabbitmq_queue + self.extractor_key = extractor_key + if extractor_key is not None: + self.rabbitmq_queue += extractor_key self.channel = None self.connection = None self.consumer_tag = None diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index 9402c60..b71ed83 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -66,6 +66,7 @@ def __init__(self): rabbitmq_exchange = os.getenv('RABBITMQ_EXCHANGE', "") clowder_url = os.getenv("CLOWDER_URL", "") registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "") + extractor_key = os.getenv("EXTRACTOR_KEY", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") input_file_path = os.getenv("INPUT_FILE_PATH") @@ -90,6 +91,9 @@ def __init__(self): self.parser.add_argument('--register', '-r', nargs='?', dest="registration_endpoints", default=registration_endpoints, help='Clowder registration URL (default=%s)' % registration_endpoints) + self.parser.add_argument('--key', '-k', dest="extractor_key", + default=extractor_key, + help='Unique key to use for extractor queue ID (sets extractor to private)') self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri, help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%")) self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename', @@ -155,6 +159,7 @@ def start(self): else: rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", "."))) + logger.info('Creating connector with key '+self.args.extractor_key) connector = RabbitMQConnector(self.args.rabbitmq_queuename, self.extractor_info, check_message=self.check_message, @@ -165,8 +170,10 @@ def start(self): rabbitmq_queue=self.args.rabbitmq_queuename, mounted_paths=json.loads(self.args.mounted_paths), clowder_url=self.args.clowder_url, - max_retry=self.args.max_retry) + max_retry=self.args.max_retry, + extractor_key=self.args.extractor_key) connector.connect() + logger.info("new version check OK") connector.register_extractor(self.args.registration_endpoints) threading.Thread(target=connector.listen, name="RabbitMQConnector").start() From 7db635959e596c3995124f1d5170123a92b2e1f1 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 17 Sep 2021 10:21:40 -0500 Subject: [PATCH 02/12] Update extractors.py --- pyclowder/extractors.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index b71ed83..a2d62ed 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -173,7 +173,6 @@ def start(self): max_retry=self.args.max_retry, extractor_key=self.args.extractor_key) connector.connect() - logger.info("new version check OK") connector.register_extractor(self.args.registration_endpoints) threading.Thread(target=connector.listen, name="RabbitMQConnector").start() From 05be2b1237d0eba7116746b591fadf7f6721fc9b Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 22 Oct 2021 10:58:34 -0500 Subject: [PATCH 03/12] add clowder email param --- pyclowder/connectors.py | 58 +++++++++++++++++++++++------------------ pyclowder/extractors.py | 15 +++++++++-- 2 files changed, 45 insertions(+), 28 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 5881458..cd632ce 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -66,7 +66,7 @@ class Connector(object): registered_clowder = list() def __init__(self, extractor_name, extractor_info, check_message=None, process_message=None, ssl_verify=True, - mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None): + mounted_paths=None, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): self.extractor_name = extractor_name self.extractor_info = extractor_info self.check_message = check_message @@ -77,6 +77,7 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m else: self.mounted_paths = mounted_paths self.clowder_url = clowder_url + self.clowder_email = clowder_email self.extractor_key = extractor_key self.max_retry = max_retry @@ -392,16 +393,10 @@ def _process_message(self, body): return # register extractor - if self.extractor_key is None: - url = "%sapi/extractors" % source_host - if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) - self.register_extractor("%s?key=%s" % (url, secret_key)) - else: - url = "%sapi/extractors/private/%s" % (source_host, self.extractor_key) - if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) - self.register_extractor("%s?key=%s" % (url, secret_key)) + url = "%sapi/extractors" % source_host + if url not in Connector.registered_clowder: + self.register_extractor("%s?key=%s" % (url, secret_key)) + Connector.registered_clowder.append(url) # tell everybody we are starting to process the file self.status_update(pyclowder.utils.StatusMessage.start, resource, "Started processing.") @@ -519,18 +514,25 @@ def register_extractor(self, endpoints): headers = {'Content-Type': 'application/json'} data = self.extractor_info + if self.extractor_key is not None and len(self.extractor_key) > 0: + data["unique_key"] = self.extractor_key + logger.info("Registering extractor with key "+self.extractor_key) + logger.info(endpoints) for url in endpoints.split(','): - if url not in Connector.registered_clowder: - Connector.registered_clowder.append(url) - try: - result = requests.post(url.strip(), headers=headers, - data=json.dumps(data), - verify=self.ssl_verify) - result.raise_for_status() - logger.debug("Registering extractor with %s : %s", url, result.text) - except Exception as exc: # pylint: disable=broad-except - logger.exception('Error in registering extractor: ' + str(exc)) + logger.info(url) + logger.info("submitting...") + if "unique_key" in data: + if url.find("?") > -1: url += "&user=%s" % self.clowder_email + else: url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email + try: + result = requests.post(url.strip(), headers=headers, + data=json.dumps(data), + verify=self.ssl_verify) + result.raise_for_status() + logger.info("Registering extractor as %s : %s", url, result.text) + except Exception as exc: # pylint: disable=broad-except + logger.exception('Error in registering extractor: ' + str(exc)) # pylint: disable=no-self-use def status_update(self, status, resource, message): @@ -637,9 +639,9 @@ class RabbitMQConnector(Connector): def __init__(self, extractor_name, extractor_info, rabbitmq_uri, rabbitmq_exchange=None, rabbitmq_key=None, rabbitmq_queue=None, check_message=None, process_message=None, ssl_verify=True, mounted_paths=None, - heartbeat=5*60, clowder_url=None, max_retry=10, extractor_key=None): + heartbeat=10, clowder_url=None, max_retry=10, extractor_key=None, clowder_email=None): super(RabbitMQConnector, self).__init__(extractor_name, extractor_info, check_message, process_message, - ssl_verify, mounted_paths, clowder_url, max_retry) + ssl_verify, mounted_paths, clowder_url, max_retry, extractor_key, clowder_email) self.rabbitmq_uri = rabbitmq_uri self.rabbitmq_exchange = rabbitmq_exchange self.rabbitmq_key = rabbitmq_key @@ -649,7 +651,7 @@ def __init__(self, extractor_name, extractor_info, self.rabbitmq_queue = rabbitmq_queue self.extractor_key = extractor_key if extractor_key is not None: - self.rabbitmq_queue += extractor_key + self.rabbitmq_queue += ".UK__" + extractor_key self.channel = None self.connection = None self.consumer_tag = None @@ -697,7 +699,7 @@ def connect(self): routing_key="extractors." + self.extractor_name) # start the extractor announcer - self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.rabbitmq_queue, self.heartbeat) + self.announcer = RabbitMQBroadcast(self.rabbitmq_uri, self.extractor_info, self.clowder_email, self.rabbitmq_queue, self.heartbeat) self.announcer.start_thread() def listen(self): @@ -801,10 +803,11 @@ def on_message(self, channel, method, header, body): class RabbitMQBroadcast: - def __init__(self, rabbitmq_uri, extractor_info, rabbitmq_queue, heartbeat): + def __init__(self, rabbitmq_uri, extractor_info, clowder_email, rabbitmq_queue, heartbeat): self.active = True self.rabbitmq_uri = rabbitmq_uri self.extractor_info = extractor_info + self.clowder_email = clowder_email self.rabbitmq_queue = rabbitmq_queue self.heartbeat = heartbeat self.id = str(uuid.uuid4()) @@ -834,6 +837,7 @@ def send_heartbeat(self): message = { 'id': self.id, 'queue': self.rabbitmq_queue, + 'owner': self.clowder_email, 'extractor_info': self.extractor_info } next_heartbeat = time.time() @@ -841,6 +845,8 @@ def send_heartbeat(self): try: self.channel.connection.process_data_events() if time.time() >= next_heartbeat: + logging.getLogger(__name__).info("Sending heartbeat...") + logging.getLogger(__name__).info(message) self.channel.basic_publish(exchange='extractors', routing_key='', body=json.dumps(message)) next_heartbeat = time.time() + self.heartbeat except SystemExit: diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index a2d62ed..f3aaae9 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -67,6 +67,7 @@ def __init__(self): clowder_url = os.getenv("CLOWDER_URL", "") registration_endpoints = os.getenv('REGISTRATION_ENDPOINTS', "") extractor_key = os.getenv("EXTRACTOR_KEY", "") + clowder_email = os.getenv("CLOWDER_EMAIL", "") logging_config = os.getenv("LOGGING") mounted_paths = os.getenv("MOUNTED_PATHS", "{}") input_file_path = os.getenv("INPUT_FILE_PATH") @@ -94,6 +95,9 @@ def __init__(self): self.parser.add_argument('--key', '-k', dest="extractor_key", default=extractor_key, help='Unique key to use for extractor queue ID (sets extractor to private)') + self.parser.add_argument('--user', '-u', dest="clowder_email", + default=clowder_email, + help='Email address of Clowder user who will initially be assigned ownership (ignored if no --key provided)') self.parser.add_argument('--rabbitmqURI', nargs='?', dest='rabbitmq_uri', default=rabbitmq_uri, help='rabbitMQ URI (default=%s)' % rabbitmq_uri.replace("%", "%%")) self.parser.add_argument('--rabbitmqQUEUE', nargs='?', dest='rabbitmq_queuename', @@ -159,7 +163,6 @@ def start(self): else: rabbitmq_key.append("*.%s.%s" % (key, mt.replace("/", "."))) - logger.info('Creating connector with key '+self.args.extractor_key) connector = RabbitMQConnector(self.args.rabbitmq_queuename, self.extractor_info, check_message=self.check_message, @@ -171,9 +174,17 @@ def start(self): mounted_paths=json.loads(self.args.mounted_paths), clowder_url=self.args.clowder_url, max_retry=self.args.max_retry, - extractor_key=self.args.extractor_key) + extractor_key=self.args.extractor_key, + clowder_email=self.args.clowder_email) connector.connect() connector.register_extractor(self.args.registration_endpoints) + + # TODO: register extractor initially without _process_message? + url = "%sapi/extractors" % self.args.clowder_url + if url not in connector.registered_clowder: + connector.register_extractor("%s" % (url)) + connector.registered_clowder.append(url) + threading.Thread(target=connector.listen, name="RabbitMQConnector").start() elif self.args.connector == "HPC": From af10b3b495af76f1e4b2bc69c820b460018ac58e Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 22 Oct 2021 11:01:05 -0500 Subject: [PATCH 04/12] remove logs --- pyclowder/connectors.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index cd632ce..4840212 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -516,12 +516,8 @@ def register_extractor(self, endpoints): data = self.extractor_info if self.extractor_key is not None and len(self.extractor_key) > 0: data["unique_key"] = self.extractor_key - logger.info("Registering extractor with key "+self.extractor_key) - logger.info(endpoints) for url in endpoints.split(','): - logger.info(url) - logger.info("submitting...") if "unique_key" in data: if url.find("?") > -1: url += "&user=%s" % self.clowder_email else: url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email From d8bb039cf5702d3f60a7e70f7c08d9394bc2c90a Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 25 Oct 2021 09:08:15 -0500 Subject: [PATCH 05/12] update private key format --- pyclowder/connectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 4840212..c9d5107 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -647,7 +647,7 @@ def __init__(self, extractor_name, extractor_info, self.rabbitmq_queue = rabbitmq_queue self.extractor_key = extractor_key if extractor_key is not None: - self.rabbitmq_queue += ".UK__" + extractor_key + self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue) self.channel = None self.connection = None self.consumer_tag = None From f8add26deaee71d103d86f5bee26ad9e88603f85 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Mon, 25 Oct 2021 09:08:59 -0500 Subject: [PATCH 06/12] Catch empty keys --- pyclowder/connectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index c9d5107..b53077e 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -646,7 +646,7 @@ def __init__(self, extractor_name, extractor_info, else: self.rabbitmq_queue = rabbitmq_queue self.extractor_key = extractor_key - if extractor_key is not None: + if extractor_key is not None and len(extractor_key) > 0: self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue) self.channel = None self.connection = None From 7d16557f2c7b07d5f1b55a054ac508c516984a9b Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 17 Aug 2022 10:33:25 -0500 Subject: [PATCH 07/12] avoid registration fail in private case --- pyclowder/connectors.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index b53077e..6e14558 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -519,8 +519,12 @@ def register_extractor(self, endpoints): for url in endpoints.split(','): if "unique_key" in data: - if url.find("?") > -1: url += "&user=%s" % self.clowder_email - else: url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email + if url.find("?") > -1: + url += "&user=%s" % self.clowder_email + else: + logger.info("Unable to register extractor without an API key.") + return + #url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email try: result = requests.post(url.strip(), headers=headers, data=json.dumps(data), From bb5928e16bc33e1907c1ded985066baa57478110 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 9 Nov 2022 10:45:11 -0600 Subject: [PATCH 08/12] Update pyclowder/connectors.py Co-authored-by: Rob Kooper --- pyclowder/connectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 6e14558..381c272 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -514,7 +514,7 @@ def register_extractor(self, endpoints): headers = {'Content-Type': 'application/json'} data = self.extractor_info - if self.extractor_key is not None and len(self.extractor_key) > 0: + if self.extractor_key: data["unique_key"] = self.extractor_key for url in endpoints.split(','): From b5dcb7529a6fdedd84f5532b5493bfb623d6f06e Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 9 Nov 2022 10:46:00 -0600 Subject: [PATCH 09/12] Update pyclowder/connectors.py Co-authored-by: Rob Kooper --- pyclowder/connectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 381c272..69f0f1c 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -650,7 +650,7 @@ def __init__(self, extractor_name, extractor_info, else: self.rabbitmq_queue = rabbitmq_queue self.extractor_key = extractor_key - if extractor_key is not None and len(extractor_key) > 0: + if extractor_key: self.rabbitmq_queue = "private.%s.%s" % (extractor_key, self.rabbitmq_queue) self.channel = None self.connection = None From 0fe9e9da70ac97fbe999f45122aa4efe244af714 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 9 Nov 2022 10:56:53 -0600 Subject: [PATCH 10/12] remove comments & unnecessary logs --- pyclowder/connectors.py | 3 --- pyclowder/extractors.py | 1 - 2 files changed, 4 deletions(-) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 381c272..b695da2 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -524,7 +524,6 @@ def register_extractor(self, endpoints): else: logger.info("Unable to register extractor without an API key.") return - #url += "?user=%s" % self.clowder_email # TODO: This will not work, need an auth key matching email try: result = requests.post(url.strip(), headers=headers, data=json.dumps(data), @@ -845,8 +844,6 @@ def send_heartbeat(self): try: self.channel.connection.process_data_events() if time.time() >= next_heartbeat: - logging.getLogger(__name__).info("Sending heartbeat...") - logging.getLogger(__name__).info(message) self.channel.basic_publish(exchange='extractors', routing_key='', body=json.dumps(message)) next_heartbeat = time.time() + self.heartbeat except SystemExit: diff --git a/pyclowder/extractors.py b/pyclowder/extractors.py index f3aaae9..573a529 100644 --- a/pyclowder/extractors.py +++ b/pyclowder/extractors.py @@ -179,7 +179,6 @@ def start(self): connector.connect() connector.register_extractor(self.args.registration_endpoints) - # TODO: register extractor initially without _process_message? url = "%sapi/extractors" % self.args.clowder_url if url not in connector.registered_clowder: connector.register_extractor("%s" % (url)) From a8c72622565044dcb39eb76a4fb0174e0ef1c249 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Wed, 9 Nov 2022 11:57:56 -0600 Subject: [PATCH 11/12] Update CHANGELOG.md --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9e37f6..9ec377c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## Unreleased + +### Added +- Add support for `EXTRACTOR_KEY` and `CLOWDER_EMAIL` environment variables to register +an extractor for just one user. + ## 2.4.1 - 2021-07-21 ### Added From 3343a33c2f9317623bd470c1faff555cef42ece6 Mon Sep 17 00:00:00 2001 From: Max Burnette Date: Fri, 3 May 2024 08:14:18 -0500 Subject: [PATCH 12/12] set unique_key in extractor_info if provided --- pyclowder/connectors.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyclowder/connectors.py b/pyclowder/connectors.py index 362bb1c..14543be 100644 --- a/pyclowder/connectors.py +++ b/pyclowder/connectors.py @@ -76,6 +76,8 @@ def __init__(self, extractor_name, extractor_info, check_message=None, process_m self.clowder_url = clowder_url self.clowder_email = clowder_email self.extractor_key = extractor_key + if extractor_key: + self.extractor_info["unique_key"] = extractor_key self.max_retry = max_retry filename = 'notifications.json'