diff --git a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py index 33fbf70ed..94cf91310 100644 --- a/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py +++ b/compute_endpoint/globus_compute_endpoint/endpoint/endpoint_manager.py @@ -415,8 +415,8 @@ def _event_loop(self): self.wait_for_children() try: - _command = self._command_queue.get(timeout=1.0) - d_tag, props, body = _command + d_tag, props, body = self._command_queue.get(timeout=1.0) + self._command.ack(d_tag) if props.headers and props.headers.get("debug", False): body_log_b = _redact_url_creds(body, redact_user=False) log.warning( @@ -448,7 +448,6 @@ def _event_loop(self): f"Unable to deserialize Globus Compute services command." f" ({e.__class__.__name__}) {e}" ) - self._command.ack(d_tag) continue now = round(time.time()) @@ -461,7 +460,6 @@ def _event_loop(self): f"\n Command timestamp: {server_cmd_ts:,} ({server_pp_ts})" f"\n Endpoint timestamp: {now:,} ({endp_pp_ts})" ) - self._command.ack(d_tag) continue try: @@ -469,7 +467,6 @@ def _event_loop(self): identity_set = msg["globus_identity_set"] except Exception as e: log.error(f"Invalid server command. ({e.__class__.__name__}) {e}") - self._command.ack(d_tag) continue identity_for_log = ( @@ -485,13 +482,11 @@ def _event_loop(self): "Identity failed to map to a local user name." f" ({e.__class__.__name__}) {e}{identity_for_log}" ) - self._command.ack(d_tag) continue except Exception as e: msg = "Unhandled error attempting to map user." log.debug(f"{msg}{identity_for_log}", exc_info=e) log.error(f"{msg} ({e.__class__.__name__}) {e}{identity_for_log}") - self._command.ack(d_tag) continue try: @@ -503,7 +498,6 @@ def _event_loop(self): " Identity mapped to a local user name, but local user does not" f" exist.\n Local user name: {local_username}{identity_for_log}" ) - self._command.ack(d_tag) continue try: @@ -528,8 +522,6 @@ def _event_loop(self): f" args: {command_args}\n" f" kwargs: {command_kwargs}{identity_for_log}" ) - finally: - self._command.ack(d_tag) def cmd_start_endpoint( self,