Skip to content

Commit

Permalink
fix: fix various wf bugs, ds undrain, nonreporting metric
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 4, 2025
1 parent e8a7e86 commit 1315c1f
Show file tree
Hide file tree
Showing 10 changed files with 333 additions and 212 deletions.
38 changes: 16 additions & 22 deletions packages/common/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,10 +977,10 @@ impl WorkflowCtx {
let sleep_location = self.cursor.current_location_for(&history_res);

// Slept before
let (deadline_ts, state, replay) = if let HistoryResult::Event(sleep) = history_res {
let (deadline_ts, state) = if let HistoryResult::Event(sleep) = history_res {
tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying sleep");

(sleep.deadline_ts, sleep.state, true)
(sleep.deadline_ts, sleep.state)
}
// Sleep
else {
Expand All @@ -996,12 +996,9 @@ impl WorkflowCtx {
)
.await?;

(deadline_ts, SleepState::Normal, false)
(deadline_ts, SleepState::Normal)
};

// Location of the signal event (comes after the sleep event)
let sleep_location2 = self.cursor.current_location_for(&history_res);

// Move to next event
self.cursor.update(&sleep_location);

Expand All @@ -1011,7 +1008,7 @@ impl WorkflowCtx {
.cursor
.compare_signal(self.version)
.map_err(GlobalError::raw)?;
let location = self.cursor.current_location_for(&history_res);
let signal_location = self.cursor.current_location_for(&history_res);

if let HistoryResult::Event(signal) = history_res {
tracing::debug!(
Expand All @@ -1024,34 +1021,31 @@ impl WorkflowCtx {
let signal = T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)?;

// Move to next event
self.cursor.update(&location);
self.cursor.update(&signal_location);

// Short circuit
return Ok(Some(signal));
} else {
return Err(GlobalError::raw(WorkflowError::HistoryDiverged(format!(
"expected signal at {}, found nothing",
location,
signal_location,
))));
}
}

// Location of the signal event (comes after the sleep event)
let history_res = self
.cursor
.compare_signal(self.version)
.map_err(GlobalError::raw)?;
let signal_location = self.cursor.current_location_for(&history_res);
let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now());

// Duration is now 0, timeout is over
let signal = if duration <= 0 {
if !replay && duration < -50 {
tracing::warn!(
name=%self.name,
id=%self.workflow_id,
%duration,
"tried to sleep for a negative duration",
);
}

// After timeout is over, check once for signal
if matches!(state, SleepState::Normal) {
let mut ctx = ListenCtx::new(self, &sleep_location2);
let mut ctx = ListenCtx::new(self, &signal_location);

match T::listen(&mut ctx).await {
Ok(x) => Some(x),
Expand All @@ -1074,7 +1068,7 @@ impl WorkflowCtx {
let mut interval = tokio::time::interval(SIGNAL_RETRY);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut ctx = ListenCtx::new(self, &sleep_location2);
let mut ctx = ListenCtx::new(self, &signal_location);

loop {
interval.tick().await;
Expand Down Expand Up @@ -1107,7 +1101,7 @@ impl WorkflowCtx {
let mut interval = tokio::time::interval(SIGNAL_RETRY);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);

let mut ctx = ListenCtx::new(self, &sleep_location2);
let mut ctx = ListenCtx::new(self, &signal_location);

loop {
interval.tick().await;
Expand Down Expand Up @@ -1140,7 +1134,7 @@ impl WorkflowCtx {
.await?;

// Move to next event
self.cursor.update(&sleep_location2);
self.cursor.update(&signal_location);
} else if matches!(state, SleepState::Normal) {
self.db
.update_workflow_sleep_event_state(
Expand Down
Loading

0 comments on commit 1315c1f

Please sign in to comment.