From 94687a6ea4d7b68e0ddf4f157c3cf73dbcb9f944 Mon Sep 17 00:00:00 2001 From: Morgan hunter Date: Mon, 23 Sep 2024 10:57:29 +1000 Subject: [PATCH] fix: add a loop to await_workflow_result so it will keep retrying until the workflow is completed - this is based on this PR: https://github.com/coinbase/temporal-ruby/pull/305/files --- lib/temporal/client.rb | 40 +++++++++++++++------------ spec/unit/lib/temporal/client_spec.rb | 25 +++++++++++++++++ 2 files changed, 48 insertions(+), 17 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index fc390e92..b4f7dd78 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -230,25 +230,31 @@ def await_workflow_result(workflow, workflow_id:, run_id: nil, timeout: nil, nam execution_options = ExecutionOptions.new(workflow, options, config.default_execution_options) max_timeout = Temporal::Connection::GRPC::SERVER_MAX_GET_WORKFLOW_EXECUTION_HISTORY_POLL history_response = nil - begin - history_response = connection.get_workflow_execution_history( - namespace: execution_options.namespace, - workflow_id: workflow_id, - run_id: run_id, - wait_for_new_event: true, - event_type: :close, - timeout: timeout || max_timeout, - ) - rescue GRPC::DeadlineExceeded => e - message = if timeout - "Timed out after your specified limit of timeout: #{timeout} seconds" - else - "Timed out after #{max_timeout} seconds, which is the maximum supported amount." + closed_event = nil + loop do + begin + history_response = connection.get_workflow_execution_history( + namespace: execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id, + wait_for_new_event: true, + event_type: :close, + timeout: timeout || max_timeout, + ) + rescue GRPC::DeadlineExceeded => e + message = if timeout + "Timed out after your specified limit of timeout: #{timeout} seconds" + else + "Timed out after #{max_timeout} seconds, which is the maximum supported amount." + end + raise TimeoutError.new(message) end - raise TimeoutError.new(message) + history = Workflow::History.new(history_response.history.events) + closed_event = history.events.first + + break if closed_event end - history = Workflow::History.new(history_response.history.events) - closed_event = history.events.first + case closed_event.type when 'WORKFLOW_EXECUTION_COMPLETED' payloads = closed_event.attributes.result diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index a0ba4dc1..6e0b53e8 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -449,6 +449,31 @@ class NamespacedWorkflow < Temporal::Workflow ) end + it 'retries if there is till there is no closed event' do + completed_event = Fabricate(:workflow_completed_event, result: nil) + response_with_no_closed_event = Fabricate(:workflow_execution_history, events: []) + response_with_closed_event = Fabricate(:workflow_execution_history, events: [completed_event]) + + expect(connection) + .to receive(:get_workflow_execution_history) + .with( + namespace: 'some-namespace', + workflow_id: workflow_id, + run_id: run_id, + wait_for_new_event: true, + event_type: :close, + timeout: 30, + ) + .and_return(response_with_no_closed_event, response_with_closed_event) + + + subject.await_workflow_result( + NamespacedWorkflow, + workflow_id: workflow_id, + run_id: run_id, + ) + end + it 'can override the namespace' do completed_event = Fabricate(:workflow_completed_event, result: nil) response = Fabricate(:workflow_execution_history, events: [completed_event])