From c176483310330ca4d8c2602332ce6fff3d18ca67 Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Sun, 19 Nov 2023 06:56:23 -0500 Subject: [PATCH] Simplify command ACK For the time being, YAGNI, and recognize that we never do anything different; we *always* ACK the message, regardless of the processing outcome. No need to worry about "proper handling" later, if it's already done. --- .../endpoint/endpoint_manager.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py index 33fbf70ed..94cf91310 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py @@ -415,8 +415,8 @@ def _event_loop(self): self.wait_for_children() try: - _command = self._command_queue.get(timeout=1.0) - d_tag, props, body = _command + d_tag, props, body = self._command_queue.get(timeout=1.0) + self._command.ack(d_tag) if props.headers and props.headers.get("debug", False): body_log_b = _redact_url_creds(body, redact_user=False) log.warning( @@ -448,7 +448,6 @@ def _event_loop(self): f"Unable to deserialize Globus Compute services command." f" ({e.__class__.__name__}) {e}" ) - self._command.ack(d_tag) continue now = round(time.time()) @@ -461,7 +460,6 @@ def _event_loop(self): f"\n Command timestamp: {server_cmd_ts:,} ({server_pp_ts})" f"\n Endpoint timestamp: {now:,} ({endp_pp_ts})" ) - self._command.ack(d_tag) continue try: @@ -469,7 +467,6 @@ def _event_loop(self): identity_set = msg["globus_identity_set"] except Exception as e: log.error(f"Invalid server command. ({e.__class__.__name__}) {e}") - self._command.ack(d_tag) continue identity_for_log = ( @@ -485,13 +482,11 @@ def _event_loop(self): "Identity failed to map to a local user name." f" ({e.__class__.__name__}) {e}{identity_for_log}" ) - self._command.ack(d_tag) continue except Exception as e: msg = "Unhandled error attempting to map user." log.debug(f"{msg}{identity_for_log}", exc_info=e) log.error(f"{msg} ({e.__class__.__name__}) {e}{identity_for_log}") - self._command.ack(d_tag) continue try: @@ -503,7 +498,6 @@ def _event_loop(self): " Identity mapped to a local user name, but local user does not" f" exist.\n Local user name: {local_username}{identity_for_log}" ) - self._command.ack(d_tag) continue try: @@ -528,8 +522,6 @@ def _event_loop(self): f" args: {command_args}\n" f" kwargs: {command_kwargs}{identity_for_log}" ) - finally: - self._command.ack(d_tag) def cmd_start_endpoint( self,