Skip to content

Commit

Permalink
Merge pull request coinbase#86 from stripe-private-oss-forks/calum-pa…
Browse files Browse the repository at this point in the history
…ginated-workflows

added paginated list_workflows_executions
  • Loading branch information
calum-stripe authored and GitHub Enterprise committed Apr 14, 2022
2 parents 350720d + 8fa91a7 commit d1140c3
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
14 changes: 14 additions & 0 deletions lib/temporal/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,20 @@ def list_closed_workflow_executions(namespace, from, to = Time.now, filter: {})

fetch_executions(:closed, { namespace: namespace, from: from, to: to }.merge(filter))
end

# TODO: (calum, 2022-06-01) remove this once we have a better understanding of the how to do pagination on these temporal-ruby APIs
def list_workflow_executions_paginated(namespace, query, next_page_token: nil)
response = connection.list_workflow_executions(namespace: namespace, query: query, next_page_token: next_page_token)

executions = response.executions.map do |raw_execution|
Temporal::Workflow::ExecutionInfo.generate_from(raw_execution)
end

{
executions: executions,
next_page_token: response.next_page_token
}
end

def get_cron_schedule(namespace, workflow_id, run_id: nil)
history_response = connection.get_workflow_execution_history(
Expand Down
9 changes: 6 additions & 3 deletions lib/temporal/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

module Temporal
class Configuration
Connection = Struct.new(:type, :host, :port, keyword_init: true)
Connection = Struct.new(:type, :host, :port, :options, keyword_init: true)
Execution = Struct.new(:namespace, :task_queue, :timeouts, :headers, keyword_init: true)

attr_reader :timeouts, :error_handlers
attr_writer :converter
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers
attr_accessor :connection_type, :host, :port, :logger, :metrics_adapter, :namespace, :task_queue, :headers, :max_page_size

# See https://docs.temporal.io/blog/activity-timeouts/ for general docs.
# We want an infinite execution timeout for cron schedules and other perpetual workflows.
Expand Down Expand Up @@ -79,7 +79,10 @@ def for_connection
Connection.new(
type: connection_type,
host: host,
port: port
port: port,
options: {
max_page_size: max_page_size
},
).freeze
end

Expand Down
3 changes: 2 additions & 1 deletion lib/temporal/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ def self.generate(configuration)
connection_class = CLIENT_TYPES_MAP[configuration.type]
host = configuration.host
port = configuration.port
options = configuration.options

hostname = `hostname`
thread_id = Thread.current.object_id
identity = "#{thread_id}@#{hostname}"

connection_class.new(host, port, identity)
connection_class.new(host, port, identity, options)
end
end
end
10 changes: 8 additions & 2 deletions lib/temporal/connection/grpc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,14 @@ def list_closed_workflow_executions(namespace:, from:, to:, next_page_token: nil
client.list_closed_workflow_executions(request)
end

def list_workflow_executions
raise NotImplementedError
def list_workflow_executions(namespace:, query:, next_page_token: nil)
request = Temporal::Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
namespace: namespace,
page_size: options[:max_page_size],
next_page_token: next_page_token,
query: query
)
client.list_workflow_executions(request)
end

def list_archived_workflow_executions
Expand Down

0 comments on commit d1140c3

Please sign in to comment.