Skip to content

Commit

Permalink
Add pagination to get_workflow_history (#290)
Browse files Browse the repository at this point in the history
* Add pagination to get_workflow_history

* Fix CI
  • Loading branch information
cduanfigma authored Jul 22, 2024
1 parent dc937e8 commit 0c9a0c7
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
28 changes: 19 additions & 9 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -400,13 +400,23 @@ def fail_activity(async_token, exception)
#
# @return [Temporal::Workflow::History] workflow's execution history
def get_workflow_history(namespace: nil, workflow_id:, run_id:)
history_response = connection.get_workflow_execution_history(
namespace: namespace || config.default_execution_options.namespace,
workflow_id: workflow_id,
run_id: run_id
)
next_page_token = nil
events = []
loop do
response =
connection.get_workflow_execution_history(
namespace: namespace || config.default_execution_options.namespace,
workflow_id: workflow_id,
run_id: run_id,
next_page_token: next_page_token,
)
events.concat(response.history.events.to_a)
next_page_token = response.next_page_token

break if next_page_token.empty?
end

Workflow::History.new(history_response.history.events)
Workflow::History.new(events)
end

# Fetch workflow's execution history as JSON. This output can be used for replay testing.
Expand Down Expand Up @@ -459,12 +469,12 @@ def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {},

def query_workflow_executions(namespace, query, filter: {}, next_page_token: nil, max_page_size: nil)
validate_filter(filter, :status, :workflow, :workflow_id)

Temporal::Workflow::Executions.new(connection: connection, status: :all, request_options: { namespace: namespace, query: query, next_page_token: next_page_token, max_page_size: max_page_size }.merge(filter))
end

# Count the number of workflows matching the provided query
#
#
# @param namespace [String]
# @param query [String]
#
Expand Down Expand Up @@ -500,7 +510,7 @@ def remove_custom_search_attributes(*attribute_names, namespace: nil)
def list_schedules(namespace, maximum_page_size:, next_page_token: '')
connection.list_schedules(namespace: namespace, maximum_page_size: maximum_page_size, next_page_token: next_page_token)
end

# Describe a schedule in a namespace
#
# @param namespace [String] namespace to list schedules in
Expand Down
44 changes: 35 additions & 9 deletions spec/unit/lib/temporal/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
it 'raises when signal_input is given but signal_name is not' do
expect do
subject.start_workflow(
TestStartWorkflow,
TestStartWorkflow,
[42, 54],
[43, 55],
options: { signal_input: 'what do you get if you multiply six by nine?', }
Expand Down Expand Up @@ -361,7 +361,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)

describe '#describe_namespace' do
before { allow(connection).to receive(:describe_namespace).and_return(Temporalio::Api::WorkflowService::V1::DescribeNamespaceResponse.new) }

it 'passes the namespace to the connection' do
result = subject.describe_namespace('new-namespace')

Expand All @@ -381,7 +381,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
.to have_received(:signal_workflow_execution)
.with(
namespace: 'default-test-namespace',
signal: 'signal',
signal: 'signal',
workflow_id: 'workflow_id',
run_id: 'run_id',
input: nil,
Expand All @@ -395,7 +395,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
.to have_received(:signal_workflow_execution)
.with(
namespace: 'default-test-namespace',
signal: 'signal',
signal: 'signal',
workflow_id: 'workflow_id',
run_id: 'run_id',
input: 'input',
Expand All @@ -409,7 +409,7 @@ def expect_signal_with_start(expected_arguments, expected_signal_argument)
.to have_received(:signal_workflow_execution)
.with(
namespace: 'other-test-namespace',
signal: 'signal',
signal: 'signal',
workflow_id: 'workflow_id',
run_id: 'run_id',
input: nil,
Expand Down Expand Up @@ -449,7 +449,7 @@ class NamespacedWorkflow < Temporal::Workflow
)
end

it 'can override the namespace' do
it 'can override the namespace' do
completed_event = Fabricate(:workflow_completed_event, result: nil)
response = Fabricate(:workflow_execution_history, events: [completed_event])

Expand Down Expand Up @@ -534,7 +534,7 @@ class NamespacedWorkflow < Temporal::Workflow
end.to raise_error(Temporal::WorkflowCanceled)
end

it 'raises TimeoutError when the server times out' do
it 'raises TimeoutError when the server times out' do
response = Fabricate(:workflow_execution_history, events: [])
expect(connection)
.to receive(:get_workflow_execution_history)
Expand Down Expand Up @@ -895,6 +895,32 @@ class NamespacedWorkflow < Temporal::Workflow
end
end

describe '#get_workflow_history' do
it 'gets full history with pagination' do
completed_event = Fabricate(:workflow_completed_event, result: nil)
response_1 = Fabricate(:workflow_execution_history, events: [completed_event], next_page_token: 'a')
response_2 = Fabricate(:workflow_execution_history, events: [completed_event], next_page_token: '')

allow(connection)
.to receive(:get_workflow_execution_history)
.and_return(response_1, response_2)

subject.get_workflow_history(namespace: namespace, workflow_id: workflow_id, run_id: run_id)

expect(connection)
.to have_received(:get_workflow_execution_history)
.with(namespace: namespace, workflow_id: workflow_id, run_id: run_id, next_page_token: nil)
.ordered

expect(connection)
.to have_received(:get_workflow_execution_history)
.with(namespace: namespace, workflow_id: workflow_id, run_id: run_id, next_page_token: 'a')
.ordered

expect(connection).to have_received(:get_workflow_execution_history).exactly(2).times
end
end

describe '#list_open_workflow_executions' do
let(:from) { Time.now - 600 }
let(:now) { Time.now }
Expand Down Expand Up @@ -977,7 +1003,7 @@ class NamespacedWorkflow < Temporal::Workflow
end
end

it 'returns the next page token and paginates correctly' do
it 'returns the next page token and paginates correctly' do
executions1 = subject.list_open_workflow_executions(namespace, from, max_page_size: 10)
executions1.map do |execution|
expect(execution).to be_an_instance_of(Temporal::Workflow::ExecutionInfo)
Expand Down Expand Up @@ -1009,7 +1035,7 @@ class NamespacedWorkflow < Temporal::Workflow
.once
end

it 'returns the next page and paginates correctly' do
it 'returns the next page and paginates correctly' do
executions1 = subject.list_open_workflow_executions(namespace, from, max_page_size: 10)
executions1.map do |execution|
expect(execution).to be_an_instance_of(Temporal::Workflow::ExecutionInfo)
Expand Down

0 comments on commit 0c9a0c7

Please sign in to comment.