diff --git a/packages/common/chirp-workflow/core/src/builder/common/signal.rs b/packages/common/chirp-workflow/core/src/builder/common/signal.rs index 72d00706a2..88ef0323de 100644 --- a/packages/common/chirp-workflow/core/src/builder/common/signal.rs +++ b/packages/common/chirp-workflow/core/src/builder/common/signal.rs @@ -6,12 +6,14 @@ use uuid::Uuid; use crate::{ builder::BuilderError, db::DatabaseHandle, error::WorkflowError, metrics, signal::Signal, + workflow::Workflow, }; pub struct SignalBuilder { db: DatabaseHandle, ray_id: Uuid, body: T, + to_workflow_name: Option<&'static str>, to_workflow_id: Option, tags: serde_json::Map, error: Option, @@ -23,13 +25,14 @@ impl SignalBuilder { db, ray_id, body, + to_workflow_name: None, to_workflow_id: None, tags: serde_json::Map::new(), error: None, } } - pub fn to_workflow(mut self, workflow_id: Uuid) -> Self { + pub fn to_workflow_id(mut self, workflow_id: Uuid) -> Self { if self.error.is_some() { return self; } @@ -39,6 +42,16 @@ impl SignalBuilder { self } + pub fn to_workflow(mut self) -> Self { + if self.error.is_some() { + return self; + } + + self.to_workflow_name = Some(W::NAME); + + self + } + pub fn tags(mut self, tags: serde_json::Value) -> Self { if self.error.is_some() { return self; @@ -81,16 +94,41 @@ impl SignalBuilder { .map_err(WorkflowError::SerializeSignalBody) .map_err(GlobalError::raw)?; - match (self.to_workflow_id, self.tags.is_empty()) { - (Some(workflow_id), true) => { - tracing::debug!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal"); + match ( + self.to_workflow_name, + self.to_workflow_id, + self.tags.is_empty(), + ) { + (Some(workflow_name), None, _) => { + tracing::debug!( + signal_name=%T::NAME, + to_workflow_name=%workflow_name, + tags=?self.tags, + %signal_id, + "dispatching signal via workflow name and tags" + ); + + let workflow_id = self + .db + .find_workflow(workflow_name, &serde_json::Value::Object(self.tags)) + .await? + .ok_or(WorkflowError::WorkflowNotFound) + .map_err(GlobalError::raw)?; + + self.db + .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val) + .await + .map_err(GlobalError::raw)?; + } + (None, Some(workflow_id), true) => { + tracing::debug!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal via workflow id"); self.db .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, &input_val) .await .map_err(GlobalError::raw)?; } - (None, false) => { + (None, None, false) => { tracing::debug!(signal_name=%T::NAME, tags=?self.tags, %signal_id, "dispatching tagged signal"); self.db @@ -104,8 +142,24 @@ impl SignalBuilder { .await .map_err(GlobalError::raw)?; } - (Some(_), false) => return Err(BuilderError::WorkflowIdAndTags.into()), - (None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()), + (Some(_), Some(_), _) => { + return Err(BuilderError::InvalidSignalSend( + "cannot provide both workflow and workflow id", + ) + .into()) + } + (None, Some(_), false) => { + return Err(BuilderError::InvalidSignalSend( + "cannot provide tags if providing a workflow id", + ) + .into()) + } + (None, None, true) => { + return Err(BuilderError::InvalidSignalSend( + "no workflow, workflow id, or tags provided", + ) + .into()) + } } metrics::SIGNAL_PUBLISHED diff --git a/packages/common/chirp-workflow/core/src/builder/mod.rs b/packages/common/chirp-workflow/core/src/builder/mod.rs index 13137f00c5..f8cffed3cc 100644 --- a/packages/common/chirp-workflow/core/src/builder/mod.rs +++ b/packages/common/chirp-workflow/core/src/builder/mod.rs @@ -5,10 +5,8 @@ pub mod workflow; pub(crate) enum BuilderError { #[error("tags must be a JSON map")] TagsNotMap, - #[error("cannot call `to_workflow` and set tags on the same signal")] - WorkflowIdAndTags, - #[error("must call `to_workflow` or set tags on signal")] - NoWorkflowIdOrTags, + #[error("invalid signal send: {0}")] + InvalidSignalSend(&'static str), #[error("cannot dispatch a workflow/signal from an operation within a workflow execution. trigger it from the workflow's body")] CannotDispatchFromOpInWorkflow, #[error("using tags on a sub workflow ({0}) with `.output()` is not supported")] diff --git a/packages/common/chirp-workflow/core/src/builder/workflow/signal.rs b/packages/common/chirp-workflow/core/src/builder/workflow/signal.rs index 4bc3548a1c..0bb3b9375c 100644 --- a/packages/common/chirp-workflow/core/src/builder/workflow/signal.rs +++ b/packages/common/chirp-workflow/core/src/builder/workflow/signal.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::{ builder::BuilderError, ctx::WorkflowCtx, error::WorkflowError, history::cursor::HistoryResult, - metrics, signal::Signal, + metrics, signal::Signal, workflow::Workflow, }; pub struct SignalBuilder<'a, T: Signal + Serialize> { @@ -14,6 +14,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> { version: usize, body: T, + to_workflow_name: Option<&'static str>, to_workflow_id: Option, tags: serde_json::Map, error: Option, @@ -26,13 +27,14 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { version, body, + to_workflow_name: None, to_workflow_id: None, tags: serde_json::Map::new(), error: None, } } - pub fn to_workflow(mut self, workflow_id: Uuid) -> Self { + pub fn to_workflow_id(mut self, workflow_id: Uuid) -> Self { if self.error.is_some() { return self; } @@ -42,6 +44,16 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { self } + pub fn to_workflow(mut self) -> Self { + if self.error.is_some() { + return self; + } + + self.to_workflow_name = Some(W::NAME); + + self + } + pub fn tags(mut self, tags: serde_json::Value) -> Self { if self.error.is_some() { return self; @@ -111,15 +123,53 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { .map_err(WorkflowError::SerializeSignalBody) .map_err(GlobalError::raw)?; - match (self.to_workflow_id, self.tags.is_empty()) { - (Some(workflow_id), true) => { + match ( + self.to_workflow_name, + self.to_workflow_id, + self.tags.is_empty(), + ) { + (Some(workflow_name), None, _) => { + tracing::debug!( + name=%self.ctx.name(), + id=%self.ctx.workflow_id(), + signal_name=%T::NAME, + to_workflow_name=%workflow_name, + %signal_id, + "dispatching signal via workflow name and tags" + ); + + let workflow_id = self + .ctx + .db() + .find_workflow(workflow_name, &serde_json::Value::Object(self.tags)) + .await? + .ok_or(WorkflowError::WorkflowNotFound) + .map_err(GlobalError::raw)?; + + self.ctx + .db() + .publish_signal_from_workflow( + self.ctx.workflow_id(), + &location, + self.version, + self.ctx.ray_id(), + workflow_id, + signal_id, + T::NAME, + &input_val, + self.ctx.loop_location(), + ) + .await + .map_err(GlobalError::raw)?; + } + (None, Some(workflow_id), true) => { tracing::debug!( name=%self.ctx.name(), id=%self.ctx.workflow_id(), signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, - "dispatching signal" + "dispatching signal via workflow id" ); self.ctx @@ -138,7 +188,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { .await .map_err(GlobalError::raw)?; } - (None, false) => { + (None, None, false) => { tracing::debug!( name=%self.ctx.name(), id=%self.ctx.workflow_id(), @@ -164,8 +214,24 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> { .await .map_err(GlobalError::raw)?; } - (Some(_), false) => return Err(BuilderError::WorkflowIdAndTags.into()), - (None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()), + (Some(_), Some(_), _) => { + return Err(BuilderError::InvalidSignalSend( + "cannot provide both workflow and workflow id", + ) + .into()) + } + (None, Some(_), false) => { + return Err(BuilderError::InvalidSignalSend( + "cannot provide tags if providing a workflow id", + ) + .into()) + } + (None, None, true) => { + return Err(BuilderError::InvalidSignalSend( + "no workflow, workflow id, or tags provided", + ) + .into()) + } } metrics::SIGNAL_PUBLISHED