diff --git a/packages/common/chirp-workflow/core/src/ctx/workflow.rs b/packages/common/chirp-workflow/core/src/ctx/workflow.rs index 463dc90a2f..fc6dfad09d 100644 --- a/packages/common/chirp-workflow/core/src/ctx/workflow.rs +++ b/packages/common/chirp-workflow/core/src/ctx/workflow.rs @@ -977,10 +977,10 @@ impl WorkflowCtx { let sleep_location = self.cursor.current_location_for(&history_res); // Slept before - let (deadline_ts, state, replay) = if let HistoryResult::Event(sleep) = history_res { + let (deadline_ts, state) = if let HistoryResult::Event(sleep) = history_res { tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying sleep"); - (sleep.deadline_ts, sleep.state, true) + (sleep.deadline_ts, sleep.state) } // Sleep else { @@ -996,12 +996,9 @@ impl WorkflowCtx { ) .await?; - (deadline_ts, SleepState::Normal, false) + (deadline_ts, SleepState::Normal) }; - // Location of the signal event (comes after the sleep event) - let sleep_location2 = self.cursor.current_location_for(&history_res); - // Move to next event self.cursor.update(&sleep_location); @@ -1011,7 +1008,7 @@ impl WorkflowCtx { .cursor .compare_signal(self.version) .map_err(GlobalError::raw)?; - let location = self.cursor.current_location_for(&history_res); + let signal_location = self.cursor.current_location_for(&history_res); if let HistoryResult::Event(signal) = history_res { tracing::debug!( @@ -1024,34 +1021,31 @@ impl WorkflowCtx { let signal = T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)?; // Move to next event - self.cursor.update(&location); + self.cursor.update(&signal_location); // Short circuit return Ok(Some(signal)); } else { return Err(GlobalError::raw(WorkflowError::HistoryDiverged(format!( "expected signal at {}, found nothing", - location, + signal_location, )))); } } + // Location of the signal event (comes after the sleep event) + let history_res = self + .cursor + .compare_signal(self.version) + .map_err(GlobalError::raw)?; + let signal_location = self.cursor.current_location_for(&history_res); let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now()); // Duration is now 0, timeout is over let signal = if duration <= 0 { - if !replay && duration < -50 { - tracing::warn!( - name=%self.name, - id=%self.workflow_id, - %duration, - "tried to sleep for a negative duration", - ); - } - // After timeout is over, check once for signal if matches!(state, SleepState::Normal) { - let mut ctx = ListenCtx::new(self, &sleep_location2); + let mut ctx = ListenCtx::new(self, &signal_location); match T::listen(&mut ctx).await { Ok(x) => Some(x), @@ -1074,7 +1068,7 @@ impl WorkflowCtx { let mut interval = tokio::time::interval(SIGNAL_RETRY); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut ctx = ListenCtx::new(self, &sleep_location2); + let mut ctx = ListenCtx::new(self, &signal_location); loop { interval.tick().await; @@ -1107,7 +1101,7 @@ impl WorkflowCtx { let mut interval = tokio::time::interval(SIGNAL_RETRY); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let mut ctx = ListenCtx::new(self, &sleep_location2); + let mut ctx = ListenCtx::new(self, &signal_location); loop { interval.tick().await; @@ -1140,7 +1134,7 @@ impl WorkflowCtx { .await?; // Move to next event - self.cursor.update(&sleep_location2); + self.cursor.update(&signal_location); } else if matches!(state, SleepState::Normal) { self.db .update_workflow_sleep_event_state( diff --git a/packages/common/chirp-workflow/core/src/db/crdb_nats.rs b/packages/common/chirp-workflow/core/src/db/crdb_nats.rs index f19d8bec98..9c22ccd000 100644 --- a/packages/common/chirp-workflow/core/src/db/crdb_nats.rs +++ b/packages/common/chirp-workflow/core/src/db/crdb_nats.rs @@ -7,7 +7,7 @@ use std::{ use futures_util::StreamExt; use indoc::indoc; -use rivet_pools::prelude::NatsPool; +use rivet_pools::prelude::*; use sqlx::{pool::PoolConnection, Acquire, PgPool, Postgres}; use tokio::sync::Mutex; use tracing::Instrument; @@ -23,6 +23,9 @@ use crate::{ message, metrics, worker, }; +// HACK: We alias global error here because its hardcoded into the sql macros +type GlobalError = WorkflowError; + /// Max amount of workflows pulled from the database with each call to `pull_workflows`. const MAX_PULLED_WORKFLOWS: i64 = 50; // Base retry for query retry backoff @@ -58,6 +61,16 @@ impl DatabaseCrdbNats { } } + // Alias function for sql macro compatibility + async fn crdb(&self) -> WorkflowResult { + Ok(self.pool.clone()) + } + + // For sql macro + pub fn name(&self) -> &str { + "chirp_workflow_crdb_nats_engine" + } + /// Spawns a new thread and publishes a worker wake message to nats. fn wake_worker(&self) { let nats = self.nats.clone(); @@ -105,7 +118,7 @@ impl DatabaseCrdbNats { .message() .contains("TransactionRetryWithProtoRefreshError") => { - tracing::info!(message=%db_err.message(), "transaction retry"); + tracing::warn!(message=%db_err.message(), "transaction retry"); tokio::time::sleep(TXN_RETRY).await; } // Retry other errors with a backoff @@ -196,16 +209,17 @@ impl Database for DatabaseCrdbNats { let (actual_workflow_id,) = self .query(|| async { - sqlx::query_as::<_, (Uuid,)>(query) - .bind(workflow_id) - .bind(workflow_name) - .bind(rivet_util::timestamp::now()) - .bind(ray_id) - .bind(tags) - .bind(sqlx::types::Json(input)) - .fetch_one(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx) + sql_fetch_one!( + [self, (Uuid,)] + query, + workflow_id, + workflow_name, + rivet_util::timestamp::now(), + ray_id, + tags, + sqlx::types::Json(input), + ) + .await }) .await?; @@ -217,18 +231,17 @@ impl Database for DatabaseCrdbNats { } async fn get_workflow(&self, workflow_id: Uuid) -> WorkflowResult> { - sqlx::query_as::<_, WorkflowRow>(indoc!( + sql_fetch_optional!( + [self, WorkflowRow] " SELECT workflow_id, input, output FROM db_workflow.workflows WHERE workflow_id = $1 ", - )) - .bind(workflow_id) - .fetch_optional(&mut *self.conn().await?) + workflow_id, + ) .await .map(|row| row.map(Into::into)) - .map_err(WorkflowError::Sqlx) } async fn pull_workflows( @@ -241,7 +254,8 @@ impl Database for DatabaseCrdbNats { // Select all workflows that have a wake condition let workflow_rows = self .query(|| async { - sqlx::query_as::<_, PulledWorkflowRow>(indoc!( + sql_fetch_all!( + [self, PulledWorkflowRow] " WITH pull_workflows AS ( @@ -357,17 +371,15 @@ impl Database for DatabaseCrdbNats { ) SELECT * FROM pull_workflows ", - )) - .bind(worker_instance_id) - .bind(filter) - .bind(rivet_util::timestamp::now()) - // Add padding to the tick interval so that the workflow deadline is never passed before its pulled. - // The worker sleeps internally to handle this - .bind(worker::TICK_INTERVAL.as_millis() as i64 + 1) - .bind(MAX_PULLED_WORKFLOWS) - .fetch_all(&mut *self.conn().await?) + worker_instance_id, + filter, + rivet_util::timestamp::now(), + // Add padding to the tick interval so that the workflow deadline is never passed before its pulled. + // The worker sleeps internally to handle this + worker::TICK_INTERVAL.as_millis() as i64 + 1, + MAX_PULLED_WORKFLOWS, + ) .await - .map_err(WorkflowError::Sqlx) }) .await?; @@ -388,7 +400,8 @@ impl Database for DatabaseCrdbNats { let start_instant2 = Instant::now(); // Fetch all events for all fetched workflows - let events = sqlx::query_as::<_, AmalgamEventRow>(indoc!( + let events = sql_fetch_all!( + [self, AmalgamEventRow] " -- Activity events SELECT @@ -610,11 +623,9 @@ impl Database for DatabaseCrdbNats { WHERE workflow_id = ANY($1) AND forgotten = FALSE ORDER BY workflow_id ASC, location2 ASC ", - )) - .bind(&workflow_ids) - .fetch_all(&mut *self.conn().await?) - .await - .map_err(WorkflowError::Sqlx)?; + &workflow_ids, + ) + .await?; let workflows = build_histories(workflow_rows, events)?; @@ -837,7 +848,8 @@ impl Database for DatabaseCrdbNats { ) -> WorkflowResult> { let signal = self .query(|| async { - sqlx::query_as::<_, SignalRow>(indoc!( + sql_fetch_optional!( + [self, SignalRow] " WITH -- Finds the oldest signal matching the signal name filter in either the normal signals table @@ -911,17 +923,15 @@ impl Database for DatabaseCrdbNats { ) SELECT * FROM next_signal ", - )) - .bind(workflow_id) - .bind(filter) - .bind(location) - .bind(version as i64) - .bind(rivet_util::timestamp::now()) - .bind(loop_location) - .fetch_optional(&mut *self.conn().await?) + workflow_id, + filter, + location, + version as i64, + rivet_util::timestamp::now(), + loop_location, + ) .await .map(|row| row.map(Into::into)) - .map_err(WorkflowError::Sqlx) }) .await?; @@ -937,23 +947,22 @@ impl Database for DatabaseCrdbNats { body: &serde_json::value::RawValue, ) -> WorkflowResult<()> { self.query(|| async { - sqlx::query(indoc!( + sql_execute!( + [self] " INSERT INTO db_workflow.signals ( signal_id, workflow_id, signal_name, body, ray_id, create_ts ) VALUES ($1, $2, $3, $4, $5, $6) ", - )) - .bind(signal_id) - .bind(workflow_id) - .bind(signal_name) - .bind(sqlx::types::Json(body)) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .execute(&mut *self.conn().await?) + signal_id, + workflow_id, + signal_name, + sqlx::types::Json(body), + ray_id, + rivet_util::timestamp::now(), + ) .await - .map_err(WorkflowError::Sqlx) }) .await?; @@ -971,23 +980,22 @@ impl Database for DatabaseCrdbNats { body: &serde_json::value::RawValue, ) -> WorkflowResult<()> { self.query(|| async { - sqlx::query(indoc!( + sql_execute!( + [self] " INSERT INTO db_workflow.tagged_signals ( signal_id, tags, signal_name, body, ray_id, create_ts ) VALUES ($1, $2, $3, $4, $5, $6) ", - )) - .bind(signal_id) - .bind(tags) - .bind(signal_name) - .bind(sqlx::types::Json(body)) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .execute(&mut *self.conn().await?) + signal_id, + tags, + signal_name, + sqlx::types::Json(body), + ray_id, + rivet_util::timestamp::now(), + ) .await - .map_err(WorkflowError::Sqlx) }) .await?; @@ -1009,7 +1017,8 @@ impl Database for DatabaseCrdbNats { loop_location: Option<&Location>, ) -> WorkflowResult<()> { self.query(|| async { - sqlx::query(indoc!( + sql_execute!( + [self] " WITH signal AS ( @@ -1028,20 +1037,18 @@ impl Database for DatabaseCrdbNats { ) SELECT 1 ", - )) - .bind(signal_id) - .bind(to_workflow_id) - .bind(signal_name) - .bind(sqlx::types::Json(body)) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .bind(from_workflow_id) - .bind(location) - .bind(version as i64) - .bind(loop_location) - .execute(&mut *self.conn().await?) + signal_id, + to_workflow_id, + signal_name, + sqlx::types::Json(body), + ray_id, + rivet_util::timestamp::now(), + from_workflow_id, + location, + version as i64, + loop_location, + ) .await - .map_err(WorkflowError::Sqlx) }) .await?; @@ -1063,7 +1070,8 @@ impl Database for DatabaseCrdbNats { loop_location: Option<&Location>, ) -> WorkflowResult<()> { self.query(|| async { - sqlx::query(indoc!( + sql_execute!( + [self] " WITH signal AS ( @@ -1082,20 +1090,18 @@ impl Database for DatabaseCrdbNats { ) SELECT 1 ", - )) - .bind(signal_id) - .bind(tags) - .bind(signal_name) - .bind(sqlx::types::Json(body)) - .bind(ray_id) - .bind(rivet_util::timestamp::now()) - .bind(from_workflow_id) - .bind(location) - .bind(version as i64) - .bind(loop_location) - .execute(&mut *self.conn().await?) + signal_id, + tags, + signal_name, + sqlx::types::Json(body), + ray_id, + rivet_util::timestamp::now(), + from_workflow_id, + location, + version as i64, + loop_location, + ) .await - .map_err(WorkflowError::Sqlx) }) .await?; @@ -1251,7 +1257,8 @@ impl Database for DatabaseCrdbNats { let mut conn = self.conn().await?; let mut tx = conn.begin().await.map_err(WorkflowError::Sqlx)?; - sqlx::query(indoc!( + sql_execute!( + [self, @tx &mut tx] " INSERT INTO db_workflow.workflow_loop_events ( workflow_id, @@ -1268,20 +1275,19 @@ impl Database for DatabaseCrdbNats { output = $5 RETURNING 1 ", - )) - .bind(workflow_id) - .bind(location) - .bind(version as i64) - .bind(iteration as i64) - .bind(output.map(sqlx::types::Json)) - .bind(loop_location) - .execute(&mut *tx) - .await - .map_err(WorkflowError::Sqlx)?; + workflow_id, + location, + version as i64, + iteration as i64, + output.map(sqlx::types::Json), + loop_location, + ) + .await?; // 0-th iteration is the initial insertion if iteration != 0 { - sqlx::query(indoc!( + sql_execute!( + [self, @tx &mut tx] " WITH forget_activity_events AS ( @@ -1328,7 +1334,7 @@ impl Database for DatabaseCrdbNats { loop_location2_hash = $2 AND forgotten = FALSE RETURNING 1 - ), + ), forget_loop_events AS ( UPDATE db_workflow.workflow_loop_events@workflow_loop_events_workflow_id_loop_location2_hash_idx SET forgotten = TRUE @@ -1338,6 +1344,15 @@ impl Database for DatabaseCrdbNats { forgotten = FALSE RETURNING 1 ), + forget_sleep_events AS ( + UPDATE db_workflow.workflow_sleep_events@workflow_sleep_events_workflow_id_loop_location2_hash_idx + SET forgotten = TRUE + WHERE + workflow_id = $1 AND + loop_location2_hash = $2 AND + forgotten = FALSE + RETURNING 1 + ), forget_branch_events AS ( UPDATE db_workflow.workflow_branch_events@workflow_branch_events_workflow_id_loop_location_hash_idx SET forgotten = TRUE @@ -1367,12 +1382,10 @@ impl Database for DatabaseCrdbNats { ) SELECT 1 ", - )) - .bind(workflow_id) - .bind(hash_location(location)?) - .execute(&mut *tx) - .await - .map_err(WorkflowError::Sqlx)?; + workflow_id, + hash_location(location), + ) + .await?; } tx.commit().await.map_err(WorkflowError::Sqlx)?; @@ -1837,12 +1850,62 @@ mod types { // IMPORTANT: Must match the hashing algorithm used in the `db-workflow` `loop_location2_hash` generated /// column expression. - pub fn hash_location(location: &Location) -> WorkflowResult> { - Ok(md5::compute( - serde_json::to_vec(&serialize_location(location)) - .map_err(WorkflowError::SerializeLocation)?, - ) - .to_vec()) + pub fn hash_location(location: &Location) -> Vec { + let mut s = "[".to_string(); + + let mut loc_iter = location.iter(); + + // First coord + if let Some(coord) = loc_iter.next() { + let mut coord_iter = coord.iter(); + + s.push_str("["); + + // First part + if let Some(part) = coord_iter.next() { + s.push_str(&part.to_string()); + } + + // Rest + for part in coord_iter { + // NOTE: The space here is important as it mimics the default behavior of casting JSONB to + // TEXT in CRDB. + s.push_str(", "); + s.push_str(&part.to_string()); + } + + s.push_str("]"); + } + + // Rest + for coord in loc_iter { + // NOTE: The space here is important as it mimics the default behavior of casting JSONB to + // TEXT in CRDB. + s.push_str(", "); + + let mut coord_iter = coord.iter(); + + s.push_str("["); + + // First part + if let Some(part) = coord_iter.next() { + s.push_str(&part.to_string()); + } + + // Rest + for part in coord_iter { + // NOTE: The space here is important as it mimics the default behavior of casting JSONB to + // TEXT in CRDB. + s.push_str(", "); + s.push_str(&part.to_string()); + } + + s.push_str("]"); + } + + s.push_str("]"); + + md5::compute(s).to_vec() } /// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the diff --git a/packages/common/chirp-workflow/core/src/error.rs b/packages/common/chirp-workflow/core/src/error.rs index dd7ad405ec..86b97089c2 100644 --- a/packages/common/chirp-workflow/core/src/error.rs +++ b/packages/common/chirp-workflow/core/src/error.rs @@ -136,7 +136,7 @@ pub enum WorkflowError { BuildSqlx(sqlx::Error), #[error("sql error: {0}")] - Sqlx(sqlx::Error), + Sqlx(#[from] sqlx::Error), #[error("max sql retries, last error: {0}")] MaxSqlRetries(sqlx::Error), diff --git a/packages/common/chirp-workflow/core/src/worker.rs b/packages/common/chirp-workflow/core/src/worker.rs index 39207289d0..4025972c9a 100644 --- a/packages/common/chirp-workflow/core/src/worker.rs +++ b/packages/common/chirp-workflow/core/src/worker.rs @@ -115,7 +115,6 @@ impl Worker { workflow.workflow_id, &workflow.workflow_name, ); - let wake_deadline_ts = workflow.wake_deadline_ts; let ctx = WorkflowCtx::new( self.registry.clone(), self.db.clone(), @@ -127,11 +126,6 @@ impl Worker { tokio::task::spawn( async move { - // Sleep until deadline - if let Some(wake_deadline_ts) = wake_deadline_ts { - utils::time::sleep_until_ts(wake_deadline_ts as u64).await; - } - if let Err(err) = ctx.run().await { tracing::error!(?err, "unhandled error"); } diff --git a/packages/common/pools/src/utils/sql_query_macros.rs b/packages/common/pools/src/utils/sql_query_macros.rs index aa213f7cd3..3b6926b3df 100644 --- a/packages/common/pools/src/utils/sql_query_macros.rs +++ b/packages/common/pools/src/utils/sql_query_macros.rs @@ -22,6 +22,16 @@ macro_rules! __sql_query_metrics_acquire { }; } +#[macro_export] +macro_rules! __opt_indoc { + ($lit:literal) => { + indoc!($lit) + }; + ($other:expr) => { + $other + }; +} + #[macro_export] macro_rules! __sql_query_metrics_start { ($ctx:expr, $action:expr, $acquire_timer:ident, $start_timer:ident) => {{ @@ -118,11 +128,11 @@ macro_rules! __sql_acquire { #[macro_export] macro_rules! __sql_query { - ([$ctx:expr, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { use sqlx::Acquire; - let query = sqlx::query(indoc!($sql)) + let query = sqlx::query($crate::__opt_indoc!($sql)) $( .bind($bind) )*; @@ -139,10 +149,10 @@ macro_rules! __sql_query { res } - }; - ([$ctx:expr, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { - let query = sqlx::query(indoc!($sql)) + let query = sqlx::query($crate::__opt_indoc!($sql)) $( .bind($bind) )*; @@ -155,19 +165,19 @@ macro_rules! __sql_query { res } - }; - ([$ctx:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query!([$ctx, &$ctx.crdb().await?] $sql, $($bind),*) - }; + }; } #[macro_export] macro_rules! __sql_query_as { - ([$ctx:expr, $rv:ty, $action:ident, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $action:ident, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { use sqlx::Acquire; - let query = sqlx::query_as::<_, $rv>(indoc!($sql)) + let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) $( .bind($bind) )*; @@ -184,10 +194,10 @@ macro_rules! __sql_query_as { res } - }; - ([$ctx:expr, $rv:ty, $action:ident, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, $action:ident, @tx $tx:expr] $sql:expr, $($bind:expr),* $(,)?) => { async { - let query = sqlx::query_as::<_, $rv>(indoc!($sql)) + let query = sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) $( .bind($bind) )*; @@ -200,93 +210,93 @@ macro_rules! __sql_query_as { res } - }; - ([$ctx:expr, $rv:ty, $action:ident] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, $action:ident] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, $action, &$ctx.crdb().await?] $sql, $($bind),*) - }; + }; } /// Returns a query without being wrapped in an async block, and therefore cannot time the query. /// Used for the `fetch` function. #[macro_export] macro_rules! __sql_query_as_raw { - ([$ctx:expr, $rv:ty, $action:ident, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $action:ident, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { // We can't record metrics for this because we can't move the `await` in to this macro - sqlx::query_as::<_, $rv>(indoc!($sql)) + sqlx::query_as::<_, $rv>($crate::__opt_indoc!($sql)) $( .bind($bind) )* .$action($crdb) - }; + }; // TODO: This doesn't work with `fetch` - ([$ctx:expr, $rv:ty, $action:ident] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $action:ident] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as_raw!([$ctx, $rv, $action, &$ctx.crdb().await?] $sql, $($bind),*) - }; + }; } // MARK: Specific actions #[macro_export] macro_rules! sql_execute { - ($($arg:tt)*) => { + ($($arg:tt)*) => { __sql_query!($($arg)*) - }; + }; } #[macro_export] macro_rules! sql_fetch { - ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as_raw!([$ctx, $rv, fetch, $crdb] $sql, $($bind),*) - }; + }; } #[macro_export] macro_rules! sql_fetch_all { - ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_all, $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_all, @tx $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_all] $sql, $($bind),*) - }; + }; } #[macro_export] macro_rules! sql_fetch_many { - ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_many, $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_many, @tx $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_many] $sql, $($bind),*) - }; + }; } #[macro_export] macro_rules! sql_fetch_one { - ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_one, $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_one, @tx $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_one] $sql, $($bind),*) - }; + }; } #[macro_export] macro_rules! sql_fetch_optional { - ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + ([$ctx:expr, $rv:ty, $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_optional, $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty, @tx $crdb:expr] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_optional, @tx $crdb] $sql, $($bind),*) - }; - ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { + }; + ([$ctx:expr, $rv:ty] $sql:expr, $($bind:expr),* $(,)?) => { __sql_query_as!([$ctx, $rv, fetch_optional] $sql, $($bind),*) - }; + }; } diff --git a/packages/infra/server/src/util/wf/history.rs b/packages/infra/server/src/util/wf/history.rs index a975660ce8..2186f31109 100644 --- a/packages/infra/server/src/util/wf/history.rs +++ b/packages/infra/server/src/util/wf/history.rs @@ -378,7 +378,10 @@ pub fn build(event_rows: Vec) -> Result> { }; let curr_coord_head = event.location.tail().expect("empty location").head(); - assert!(last_coord_head <= curr_coord_head, "invalid history"); + // assert!(last_coord_head <= curr_coord_head, "invalid history"); + if last_coord_head > curr_coord_head { + tracing::error!("============ THIS WORKFLOW HAS INVALID HISTORY ============"); + } let offset = if skip { 0 diff --git a/packages/infra/server/src/util/wf/mod.rs b/packages/infra/server/src/util/wf/mod.rs index b844fc26c3..fac74c530c 100644 --- a/packages/infra/server/src/util/wf/mod.rs +++ b/packages/infra/server/src/util/wf/mod.rs @@ -56,7 +56,7 @@ pub enum WorkflowState { pub struct WorkflowRow { workflow_id: Uuid, workflow_name: String, - tags: Option, + tags: serde_json::Value, create_ts: i64, input: serde_json::Value, output: Option, @@ -69,6 +69,7 @@ pub struct WorkflowRow { #[derive(Debug, sqlx::FromRow)] pub struct HistoryWorkflowRow { workflow_name: String, + tags: serde_json::Value, input: serde_json::Value, output: Option, error: Option, @@ -325,6 +326,7 @@ pub async fn print_history( " SELECT workflow_name, + tags, input, output, error, @@ -591,12 +593,21 @@ pub async fn print_history( ); if !exclude_json { + println!( + "{} tags {}", + style("|").dim(), + indent_string( + &colored_json(&workflow.tags)?, + style("| ").dim().to_string(), + true + ) + ); println!( "{} input {}", style("|").dim(), indent_string( &colored_json(&workflow.input)?, - style(" | ").dim().to_string(), + style("| ").dim().to_string(), true ) ); @@ -882,11 +893,11 @@ pub async fn print_history( if !exclude_json { println!( - " {} output {}", + "{} output {}", style("|").blue(), indent_string( &colored_json(&output)?, - style(" | ").blue().to_string(), + style("| ").blue().to_string(), true ) ); @@ -937,8 +948,7 @@ mod table { pub workflow_name: String, #[tabled(display_with = "display_state")] pub state: WorkflowState, - #[tabled(display_with = "display_option")] - pub tags: Option, + pub tags: String, } pub fn workflows(workflows: Vec) -> Result<()> { @@ -957,7 +967,7 @@ mod table { } else { WorkflowState::Dead }, - tags: w.tags.as_ref().map(colored_json_ugly).transpose()?, + tags: colored_json_ugly(&w.tags)?, }) }) .collect::>>()?; @@ -977,11 +987,4 @@ mod table { WorkflowState::Dead => style("dead").red().to_string(), } } - - pub(crate) fn display_option(item: &Option) -> String { - match item { - Some(s) => s.to_string(), - None => String::new(), - } - } } diff --git a/packages/services/cluster/src/metrics.rs b/packages/services/cluster/src/metrics.rs index ed31df9b9f..6f74f7317d 100644 --- a/packages/services/cluster/src/metrics.rs +++ b/packages/services/cluster/src/metrics.rs @@ -88,7 +88,7 @@ lazy_static::lazy_static! { pub static ref NONREPORTING_SERVER: IntGaugeVec = register_int_gauge_vec_with_registry!( "provision_nonreporting_server", "Servers without reporting Prometheus metrics.", - &["cluster_id", "datacenter_id", "server_id", "provider_datacenter_id", "pool_type"], + &["cluster_id", "datacenter_id", "server_id", "provider_datacenter_id", "datacenter_name_id", "pool_type"], *REGISTRY, ).unwrap(); } diff --git a/packages/services/ds/src/workers/mod.rs b/packages/services/ds/src/workers/mod.rs index d75bafa4c8..f04badaf7f 100644 --- a/packages/services/ds/src/workers/mod.rs +++ b/packages/services/ds/src/workers/mod.rs @@ -1,4 +1,5 @@ mod drain_all; +mod undrain_all; mod webhook; -chirp_worker::workers![drain_all,]; +chirp_worker::workers![drain_all, undrain_all]; diff --git a/packages/services/ds/src/workers/undrain_all.rs b/packages/services/ds/src/workers/undrain_all.rs new file mode 100644 index 0000000000..7038432215 --- /dev/null +++ b/packages/services/ds/src/workers/undrain_all.rs @@ -0,0 +1,53 @@ +use std::convert::TryInto; + +use chirp_worker::prelude::*; +use proto::backend::pkg::*; + +#[worker(name = "ds-undrain-all")] +async fn worker(ctx: &OperationContext) -> GlobalResult<()> { + let server_rows = if let Some(nomad_node_id) = &ctx.nomad_node_id { + sql_fetch_all!( + [ctx, (Uuid,)] + " + SELECT s.server_id + FROM db_ds.servers AS s + JOIN db_ds.server_nomad AS sn + ON s.server_id = sn.server_id + WHERE + sn.nomad_node_id = $1 AND + s.destroy_ts IS NULL + ", + nomad_node_id, + ) + .await? + } else if let Some(pegboard_client_id) = &ctx.pegboard_client_id { + sql_fetch_all!( + [ctx, (Uuid,)] + " + SELECT s.server_id + FROM db_ds.servers AS s + JOIN db_ds.servers_pegboard AS spb + ON s.server_id = spb.server_id + JOIN db_pegboard.actors AS a + ON spb.pegboard_actor_id = a.actor_id + WHERE + a.client_id = $1 AND + s.destroy_ts IS NULL + ", + pegboard_client_id, + ) + .await? + } else { + bail!("neither `nomad_node_id` nor `pegboard_client_id` set"); + }; + + for (server_id,) in server_rows { + chirp_workflow::compat::signal(ctx, crate::workflows::server::Undrain { }) + .await? + .tag("server_id", server_id) + .send() + .await?; + } + + Ok(()) +}