diff --git a/src/substantial/src/backends/redis.rs b/src/substantial/src/backends/redis.rs index 394cfa6a6c..024d370341 100644 --- a/src/substantial/src/backends/redis.rs +++ b/src/substantial/src/backends/redis.rs @@ -125,7 +125,7 @@ impl super::BackendStore for RedisBackend { let non_prefixed_sched_ref = schedule.to_rfc3339(); let sched_score = 1.0 / (schedule.timestamp() as f64); let sched_key = self.key(&[&non_prefixed_sched_ref, &run_id])?; - let sched_ref = self.key(&[&non_prefixed_sched_ref])?; + let sched_ref = self.key(&["ref_", &run_id, &non_prefixed_sched_ref])?; let script = Script::new( r#" @@ -194,7 +194,7 @@ impl super::BackendStore for RedisBackend { let q_key: String = self.key(&["schedules", &queue])?; let non_prefixed_sched_ref = schedule.to_rfc3339(); let sched_key = self.key(&[&non_prefixed_sched_ref, &run_id])?; - let sched_ref = self.key(&[&non_prefixed_sched_ref])?; + let sched_ref = self.key(&["ref_", &run_id, &non_prefixed_sched_ref])?; let script = Script::new( r#" @@ -232,10 +232,10 @@ impl super::BackendAgent for RedisBackend { r#" local q_key = KEYS[1] local excludes = ARGV - local schedule_keys = redis.call("ZRANGE", q_key, 0, -1) + local schedule_refs = redis.call("ZRANGE", q_key, 0, -1) - for _, schedule_key in ipairs(schedule_keys) do - local run_ids = redis.call("ZRANGE", schedule_key, 0, -1) + for _, schedule_ref in ipairs(schedule_refs) do + local run_ids = redis.call("ZRANGE", schedule_ref, 0, -1) for _, run_id in ipairs(run_ids) do local is_excluded = false for k = 1, #excludes do @@ -246,7 +246,7 @@ impl super::BackendAgent for RedisBackend { end if not is_excluded then - return {run_id, schedule_key} + return {run_id, schedule_ref} end end end @@ -257,16 +257,19 @@ impl super::BackendAgent for RedisBackend { let lua_ret: Option<(String, String)> = script.key(q_key).arg(excludes).invoke(r)?; - if let Some((run_id, schedule_key)) = lua_ret { + if let Some((run_id, schedule_ref)) = lua_ret { let schedule = self - .parts(&schedule_key)? + .parts(&schedule_ref)? .last() .cloned() - .with_context(|| format!("Invalid key {:?}", schedule_key))?; + .with_context(|| format!("Invalid key {:?}", schedule_ref))?; + println!("{:?}", schedule_ref); return Ok(Some(NextRun { run_id, - schedule_date: DateTime::parse_from_rfc3339(&schedule)?.to_utc(), + schedule_date: DateTime::parse_from_rfc3339(&schedule) + .with_context(|| format!("Parsing {:?}", schedule))? + .to_utc(), })); } diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index b72b9530a7..4857a0b622 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -45,19 +45,7 @@ export class Agent { ) {} async schedule(input: AddScheduleInput) { - // FIXME: - // This function is triggered by the user (start, event, stop) - // Using async rust in here can be tricky, one issue for example is that - // concurrent calls fail silently without panics or even exceptions on the Redis Backend - // mutation { - // one: start(..) # calls schedule(..) - // .. - // tenth: start(..) # calls schedule(..) - // } - await Meta.substantial.storeAddSchedule(input); - // This delay is completely unrelated to the rust side and solves the issue - await sleep(100); } async log(runId: string, schedule: string, content: unknown) { diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index 3f2ef3c0ec..bc6000e0c2 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -197,7 +197,6 @@ export class WorkerManager { logger.info(`trigger ${type} for ${runId}`); } - /** Just as the name indicates, this will also decide to actually run it or not depending on the `storedRun` value */ triggerStart( name: string, runId: string,