From 0c9a0c741370e80076e1225c6c148cb0fe1194c7 Mon Sep 17 00:00:00 2001 From: Carolyn Duan <108763985+cduanfigma@users.noreply.github.com> Date: Mon, 22 Jul 2024 10:09:57 -0700 Subject: [PATCH] Add pagination to get_workflow_history (#290) * Add pagination to get_workflow_history * Fix CI --- lib/temporal/client.rb | 28 +++++++++++------ spec/unit/lib/temporal/client_spec.rb | 44 +++++++++++++++++++++------ 2 files changed, 54 insertions(+), 18 deletions(-) diff --git a/lib/temporal/client.rb b/lib/temporal/client.rb index e738e2b6..a49b5ffb 100644 --- a/lib/temporal/client.rb +++ b/lib/temporal/client.rb @@ -400,13 +400,23 @@ def fail_activity(async_token, exception) # # @return [Temporal::Workflow::History] workflow's execution history def get_workflow_history(namespace: nil, workflow_id:, run_id:) - history_response = connection.get_workflow_execution_history( - namespace: namespace || config.default_execution_options.namespace, - workflow_id: workflow_id, - run_id: run_id - ) + next_page_token = nil + events = [] + loop do + response = + connection.get_workflow_execution_history( + namespace: namespace || config.default_execution_options.namespace, + workflow_id: workflow_id, + run_id: run_id, + next_page_token: next_page_token, + ) + events.concat(response.history.events.to_a) + next_page_token = response.next_page_token + + break if next_page_token.empty? + end - Workflow::History.new(history_response.history.events) + Workflow::History.new(events) end # Fetch workflow's execution history as JSON. This output can be used for replay testing. @@ -459,12 +469,12 @@ def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {}, def query_workflow_executions(namespace, query, filter: {}, next_page_token: nil, max_page_size: nil) validate_filter(filter, :status, :workflow, :workflow_id) - + Temporal::Workflow::Executions.new(connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter)) end # Count the number of workflows matching the provided query - # + # # @param namespace [String] # @param query [String] # @@ -500,7 +510,7 @@ def remove_custom_search_attributes(*attribute_names, namespace: nil) def list_schedules(namespace, maximum_page_size:, next_page_token: '') connection.list_schedules(namespace: namespace, maximum_page_size: maximum_page_size, next_page_token: next_page_token) end - + # Describe a schedule in a namespace # # @param namespace [String] namespace to list schedules in diff --git a/spec/unit/lib/temporal/client_spec.rb b/spec/unit/lib/temporal/client_spec.rb index 1dd4995d..bc970f31 100644 --- a/spec/unit/lib/temporal/client_spec.rb +++ b/spec/unit/lib/temporal/client_spec.rb @@ -300,7 +300,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) it 'raises when signal_input is given but signal_name is not' do expect do subject.start_workflow( - TestStartWorkflow, + TestStartWorkflow, [42, 54], [43, 55], options: { signal_input: 'what do you get if you multiply six by nine?', } @@ -361,7 +361,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) describe '#describe_namespace' do before { allow(connection).to receive(:describe_namespace).and_return(Temporalio::Api::WorkflowService::V1::DescribeNamespaceResponse.new) } - + it 'passes the namespace to the connection' do result = subject.describe_namespace('new-namespace') @@ -381,7 +381,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) .to have_received(:signal_workflow_execution) .with( namespace: 'default-test-namespace', - signal: 'signal', + signal: 'signal', workflow_id: 'workflow_id', run_id: 'run_id', input: nil, @@ -395,7 +395,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) .to have_received(:signal_workflow_execution) .with( namespace: 'default-test-namespace', - signal: 'signal', + signal: 'signal', workflow_id: 'workflow_id', run_id: 'run_id', input: 'input', @@ -409,7 +409,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument) .to have_received(:signal_workflow_execution) .with( namespace: 'other-test-namespace', - signal: 'signal', + signal: 'signal', workflow_id: 'workflow_id', run_id: 'run_id', input: nil, @@ -449,7 +449,7 @@ class NamespacedWorkflow < Temporal::Workflow ) end - it 'can override the namespace' do + it 'can override the namespace' do completed_event = Fabricate(:workflow_completed_event, result: nil) response = Fabricate(:workflow_execution_history, events: [completed_event]) @@ -534,7 +534,7 @@ class NamespacedWorkflow < Temporal::Workflow end.to raise_error(Temporal::WorkflowCanceled) end - it 'raises TimeoutError when the server times out' do + it 'raises TimeoutError when the server times out' do response = Fabricate(:workflow_execution_history, events: []) expect(connection) .to receive(:get_workflow_execution_history) @@ -895,6 +895,32 @@ class NamespacedWorkflow < Temporal::Workflow end end + describe '#get_workflow_history' do + it 'gets full history with pagination' do + completed_event = Fabricate(:workflow_completed_event, result: nil) + response_1 = Fabricate(:workflow_execution_history, events: [completed_event], next_page_token: 'a') + response_2 = Fabricate(:workflow_execution_history, events: [completed_event], next_page_token: '') + + allow(connection) + .to receive(:get_workflow_execution_history) + .and_return(response_1, response_2) + + subject.get_workflow_history(namespace: namespace, workflow_id: workflow_id, run_id: run_id) + + expect(connection) + .to have_received(:get_workflow_execution_history) + .with(namespace: namespace, workflow_id: workflow_id, run_id: run_id, next_page_token: nil) + .ordered + + expect(connection) + .to have_received(:get_workflow_execution_history) + .with(namespace: namespace, workflow_id: workflow_id, run_id: run_id, next_page_token: 'a') + .ordered + + expect(connection).to have_received(:get_workflow_execution_history).exactly(2).times + end + end + describe '#list_open_workflow_executions' do let(:from) { Time.now - 600 } let(:now) { Time.now } @@ -977,7 +1003,7 @@ class NamespacedWorkflow < Temporal::Workflow end end - it 'returns the next page token and paginates correctly' do + it 'returns the next page token and paginates correctly' do executions1 = subject.list_open_workflow_executions(namespace, from, max_page_size: 10) executions1.map do |execution| expect(execution).to be_an_instance_of(Temporal::Workflow::ExecutionInfo) @@ -1009,7 +1035,7 @@ class NamespacedWorkflow < Temporal::Workflow .once end - it 'returns the next page and paginates correctly' do + it 'returns the next page and paginates correctly' do executions1 = subject.list_open_workflow_executions(namespace, from, max_page_size: 10) executions1.map do |execution| expect(execution).to be_an_instance_of(Temporal::Workflow::ExecutionInfo)