Skip to content

Commit

Permalink
feat: fdb sqlite workflows driver
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jan 24, 2025
1 parent 057e98b commit 9a43f01
Show file tree
Hide file tree
Showing 56 changed files with 6,078 additions and 478 deletions.
22 changes: 22 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion Cargo.toml

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion packages/common/chirp-workflow/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,17 @@ license.workspace = true
edition.workspace = true

[dependencies]
anyhow = "1.0"
async-trait = "0.1.80"
chirp-client.workspace = true
chirp-workflow-macros.workspace = true
cjson = "0.1"
fdb-util.workspace = true
formatted-error.workspace = true
foundationdb.workspace = true
futures-util = "0.3"
global-error.workspace = true
include_dir = "0.7.4"
indoc = "2.0.5"
lazy_static = "1.4"
md5 = "0.7.0"
Expand All @@ -29,11 +33,13 @@ rivet-runtime.workspace = true
rivet-util.workspace = true
serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
sqlite-util.workspace = true
strum = { version = "0.26", features = ["derive"] }
thiserror = "1.0.59"
tokio = { version = "1.40.0", features = ["full"] }
tokio-util = "0.7"
tracing = "0.1.40"
tracing-logfmt.workspace = true
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
uuid = { version = "1.8.0", features = ["v4", "serde"] }

Expand All @@ -44,8 +50,10 @@ features = [
"postgres",
"uuid",
"json",
"ipnetwork"
"ipnetwork",
"sqlite"
]

[dev-dependencies]
anyhow = "1.0.82"
rand = "0.8"
18 changes: 7 additions & 11 deletions packages/common/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
common,
message::{MessageCtx, SubscriptionHandle},
},
db::{DatabaseCrdbNats, DatabaseHandle},
db::{Database, DatabaseCrdbNats, DatabaseHandle},
message::{AsTags, Message},
operation::{Operation, OperationInput},
signal::Signal,
Expand Down Expand Up @@ -105,20 +105,16 @@ where
msg_ctx.subscribe::<M>(tags).await.map_err(GlobalError::raw)
}

// Get pool as a trait object
async fn db_from_ctx<B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
) -> GlobalResult<DatabaseHandle> {
let crdb = ctx.crdb().await?;
let nats = ctx.conn().nats().await?;

Ok(DatabaseCrdbNats::from_pools(crdb, nats))
DatabaseCrdbNats::from_pools(ctx.pools().clone())
.map(|db| db as DatabaseHandle)
.map_err(Into::into)
}

// Get crdb pool as a trait object
pub async fn db_from_pools(pools: &rivet_pools::Pools) -> GlobalResult<DatabaseHandle> {
let crdb = pools.crdb()?;
let nats = pools.nats()?;

Ok(DatabaseCrdbNats::from_pools(crdb, nats))
DatabaseCrdbNats::from_pools(pools.clone())
.map(|db| db as DatabaseHandle)
.map_err(Into::into)
}
7 changes: 6 additions & 1 deletion packages/common/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::{
#[derive(Clone)]
pub struct ActivityCtx {
workflow_id: Uuid,
workflow_name: String,
ray_id: Uuid,
name: &'static str,
ts: i64,
Expand All @@ -33,6 +34,7 @@ pub struct ActivityCtx {
impl ActivityCtx {
pub async fn new(
workflow_id: Uuid,
workflow_name: String,
db: DatabaseHandle,
config: &rivet_config::Config,
conn: &rivet_connection::Connection,
Expand Down Expand Up @@ -60,6 +62,7 @@ impl ActivityCtx {

Ok(ActivityCtx {
workflow_id,
workflow_name,
ray_id,
name,
ts,
Expand Down Expand Up @@ -94,9 +97,11 @@ impl ActivityCtx {
.await
}

// TODO: Theres nothing preventing this from being able to be called from the workflow ctx also, but for
// now its only in the activity ctx so it isn't called again during workflow retries
pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
self.db
.update_workflow_tags(self.workflow_id, tags)
.update_workflow_tags(self.workflow_id, &self.workflow_name, tags)
.await
.map_err(GlobalError::raw)
}
Expand Down
2 changes: 1 addition & 1 deletion packages/common/chirp-workflow/core/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use global_error::{GlobalError, GlobalResult};
use uuid::Uuid;

/// Poll interval when polling for a sub workflow in-process
pub const SUB_WORKFLOW_RETRY: Duration = Duration::from_millis(150);
pub const SUB_WORKFLOW_RETRY: Duration = Duration::from_millis(500);
/// Time to delay a workflow from retrying after an error
pub const RETRY_TIMEOUT_MS: usize = 2000;
pub const WORKFLOW_TIMEOUT: Duration = Duration::from_secs(60);
Expand Down
1 change: 1 addition & 0 deletions packages/common/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl OperationCtx {

/// Creates a signal builder.
pub fn signal<T: Signal + Serialize>(&self, body: T) -> builder::signal::SignalBuilder<T> {
// TODO: Add check for from_workflow so you cant dispatch a signal
builder::signal::SignalBuilder::new(self.db.clone(), self.ray_id, body)
}
}
Expand Down
35 changes: 25 additions & 10 deletions packages/common/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
message::{SubscriptionHandle, TailAnchor, TailAnchorResponse},
MessageCtx,
},
db::{DatabaseCrdbNats, DatabaseHandle},
db::{Database, DatabaseHandle},
message::{AsTags, Message, NatsMessage},
operation::{Operation, OperationInput},
signal::Signal,
Expand All @@ -34,14 +34,28 @@ pub struct TestCtx {
}

impl TestCtx {
pub async fn from_env(test_name: &str) -> TestCtx {
pub async fn from_env<DB: Database + Sync + 'static>(
test_name: &str,
no_config: bool,
) -> TestCtx {
let service_name = format!("{}-test--{}", rivet_env::service_name(), test_name);

let ray_id = Uuid::new_v4();
let config = rivet_config::Config::load::<String>(&[]).await.unwrap();
let pools = rivet_pools::Pools::new(config.clone())
.await
.expect("failed to create pools");
let (config, pools) = if no_config {
let config = rivet_config::Config::from_root(rivet_config::config::Root::default());
let pools = rivet_pools::Pools::test(config.clone())
.await
.expect("failed to create pools");

(config, pools)
} else {
let config = rivet_config::Config::load::<String>(&[]).await.unwrap();
let pools = rivet_pools::Pools::new(config.clone())
.await
.expect("failed to create pools");

(config, pools)
};
let shared_client = chirp_client::SharedClient::from_env(pools.clone())
.expect("failed to create chirp client");
let cache =
Expand All @@ -68,10 +82,7 @@ impl TestCtx {
(),
);

let db = DatabaseCrdbNats::from_pools(
pools.crdb().unwrap(),
pools.nats_option().clone().unwrap(),
);
let db = DB::from_pools(pools).unwrap();
let msg_ctx = MessageCtx::new(&conn, ray_id).await.unwrap();

TestCtx {
Expand Down Expand Up @@ -224,6 +235,10 @@ impl TestCtx {
self.conn.cache_handle()
}

pub fn pools(&self) -> &rivet_pools::Pools {
self.conn.pools()
}

pub async fn crdb(&self) -> Result<CrdbPool, rivet_pools::Error> {
self.conn.crdb().await
}
Expand Down
20 changes: 16 additions & 4 deletions packages/common/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ impl WorkflowCtx {
interval.tick().await;

// Write output
if let Err(err) = self.db.commit_workflow(self.workflow_id, &output).await {
if let Err(err) = self
.db
.complete_workflow(self.workflow_id, &self.name, &output)
.await
{
if retries > MAX_DB_ACTION_RETRIES {
return Err(err);
}
Expand Down Expand Up @@ -213,8 +217,9 @@ impl WorkflowCtx {
// Write output
let res = self
.db
.fail_workflow(
.commit_workflow(
self.workflow_id,
&self.name,
false,
deadline_ts,
wake_signals,
Expand Down Expand Up @@ -250,6 +255,7 @@ impl WorkflowCtx {

let ctx = ActivityCtx::new(
self.workflow_id,
self.name.clone(),
self.db.clone(),
&self.config,
&self.conn,
Expand Down Expand Up @@ -446,10 +452,10 @@ impl WorkflowCtx {
loop {
interval.tick().await;

// Check if state finished
// Check if workflow completed
let workflow = self
.db
.get_workflow(sub_workflow_id)
.get_sub_workflow(self.workflow_id, &self.name, sub_workflow_id)
.await
.map_err(GlobalError::raw)?
.ok_or(WorkflowError::WorkflowNotFound)
Expand Down Expand Up @@ -788,6 +794,8 @@ impl WorkflowCtx {
&loop_location,
self.version,
0,
// TODO:
&serde_json::value::RawValue::from_string("null".to_string())?,
None,
self.loop_location(),
)
Expand Down Expand Up @@ -851,6 +859,8 @@ impl WorkflowCtx {
&loop_location,
self.version,
iteration,
// TODO:
&serde_json::value::RawValue::from_string("null".to_string())?,
None,
self.loop_location(),
)
Expand All @@ -875,6 +885,8 @@ impl WorkflowCtx {
&loop_location,
self.version,
iteration,
// TODO:
&serde_json::value::RawValue::from_string("null".to_string())?,
Some(&output_val),
self.loop_location(),
)
Expand Down
Loading

0 comments on commit 9a43f01

Please sign in to comment.