Skip to content

Commit

Permalink
Remove cancelation commands when underlying futures are closed (#275)
Browse files Browse the repository at this point in the history
* Remove cancelation commands when underlying futures are closed

* Fix spec for timer command preservation

* Remove potentially flaky example spec
  • Loading branch information
jeffschoner authored Jan 4, 2024
1 parent 628960b commit cfcbdd3
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 3 deletions.
8 changes: 8 additions & 0 deletions lib/temporal/workflow/command_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ def fail
def time_out
@state = TIMED_OUT_STATE
end

def closed?
@state == COMPLETED_STATE ||
@state == CANCELED_STATE ||
@state == FAILED_STATE ||
@state == TIMED_OUT_STATE ||
@state == TERMINATED_STATE
end
end
end
end
2 changes: 1 addition & 1 deletion lib/temporal/workflow/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def run
state_manager.apply(window)
end

RunResult.new(commands: state_manager.commands, new_sdk_flags_used: state_manager.new_sdk_flags_used)
RunResult.new(commands: state_manager.final_commands, new_sdk_flags_used: state_manager.new_sdk_flags_used)
end

# Process queries using the pre-registered query handlers
Expand Down
22 changes: 20 additions & 2 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class StateManager
class UnsupportedEvent < Temporal::InternalError; end
class UnsupportedMarkerType < Temporal::InternalError; end

attr_reader :commands, :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals
attr_reader :local_time, :search_attributes, :new_sdk_flags_used, :sdk_flags, :first_task_signals

def initialize(dispatcher, config)
@dispatcher = dispatcher
Expand Down Expand Up @@ -87,6 +87,24 @@ def schedule(command)
[event_target_from(command_id, command), cancelation_id]
end

def final_commands
# Filter out any activity or timer cancellation commands if the underlying activity or
# timer has completed. This can occur when an activity or timer completes while a
# workflow task is being processed that would otherwise cancel this time or activity.
commands.filter do |command_pair|
case command_pair.last
when Command::CancelTimer
state_machine = command_tracker[command_pair.last.timer_id]
!state_machine.closed?
when Command::RequestActivityCancellation
state_machine = command_tracker[command_pair.last.activity_id]
!state_machine.closed?
else
true
end
end
end

def release?(release_name)
track_release(release_name) unless releases.key?(release_name)

Expand Down Expand Up @@ -149,7 +167,7 @@ def history_size

private

attr_reader :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config
attr_reader :commands, :dispatcher, :command_tracker, :marker_ids, :side_effects, :releases, :config

def use_signals_first(raw_events)
# The presence of SAVE_FIRST_TASK_SIGNALS implies HANDLE_SIGNALS_FIRST
Expand Down
83 changes: 83 additions & 0 deletions spec/unit/lib/temporal/workflow/state_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,89 @@ def test_order_one_task(*expected_sdk_flags)
end
end

describe "#final_commands" do
let(:dispatcher) { Temporal::Workflow::Dispatcher.new }
let(:state_manager) do
Temporal::Workflow::StateManager.new(dispatcher, config)
end

let(:config) { Temporal::Configuration.new }

it "preserves canceled activity or timer commands when not completed" do
schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new
state_manager.schedule(schedule_activity_command)

start_timer_command = Temporal::Workflow::Command::StartTimer.new
state_manager.schedule(start_timer_command)

cancel_activity_command = Temporal::Workflow::Command::RequestActivityCancellation.new(
activity_id: schedule_activity_command.activity_id
)
state_manager.schedule(cancel_activity_command)

cancel_timer_command = Temporal::Workflow::Command::CancelTimer.new(
timer_id: start_timer_command.timer_id
)
state_manager.schedule(cancel_timer_command)

expect(state_manager.final_commands).to(
eq(
[
[1, schedule_activity_command],
[2, start_timer_command],
[3, cancel_activity_command],
[4, cancel_timer_command]
]
)
)
end

it "drop cancel activity command when completed" do
schedule_activity_command = Temporal::Workflow::Command::ScheduleActivity.new
state_manager.schedule(schedule_activity_command)

cancel_command = Temporal::Workflow::Command::RequestActivityCancellation.new(
activity_id: schedule_activity_command.activity_id
)
state_manager.schedule(cancel_command)

# Fake completing the activity
window = Temporal::Workflow::History::Window.new
# The fake assumes an activity event completed two events ago, so fix the event id to +2
window.add(
Temporal::Workflow::History::Event.new(
Fabricate(:api_activity_task_completed_event, event_id: schedule_activity_command.activity_id + 2)
)
)
state_manager.apply(window)

expect(state_manager.final_commands).to(eq([[1, schedule_activity_command]]))
end

it "drop cancel timer command when completed" do
start_timer_command = Temporal::Workflow::Command::StartTimer.new
state_manager.schedule(start_timer_command)

cancel_command = Temporal::Workflow::Command::CancelTimer.new(
timer_id: start_timer_command.timer_id
)
state_manager.schedule(cancel_command)

# Fake completing the timer
window = Temporal::Workflow::History::Window.new
# The fake assumes an activity event completed four events ago, so fix the event id to +4
window.add(
Temporal::Workflow::History::Event.new(
Fabricate(:api_timer_fired_event, event_id: start_timer_command.timer_id + 4)
)
)
state_manager.apply(window)

expect(state_manager.final_commands).to(eq([[1, start_timer_command]]))
end
end


describe '#search_attributes' do
let(:initial_search_attributes) do
{
Expand Down

0 comments on commit cfcbdd3

Please sign in to comment.