Skip to content

Commit

Permalink
fix(subs): key collision on redis (#865)
Browse files Browse the repository at this point in the history
Follow up of #863
When multiple start occurs for redis, some schedules can happen
**exactly** at the same time resulting into the same identifier (and
leading to an inconsistent state).

This solution simply combines the `schedule` with the run_id making it
unique instead of using it as is.

```gql
mutation AllAtOnce {
  a: start_retry(kwargs: { .. }) # => calls add_schedule( ... date ...)
  b: start_retry(kwargs: { .. })
  c: start_retry(kwargs: { .. })
  d: start_retry(kwargs: { .. }) 
  e: start_retry(kwargs: { .. })
  f: start_retry(kwargs: { .. })
 # ..
}
```

#### Migration notes

None

- [ ] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
michael-0acf4 authored Oct 4, 2024
1 parent bbef981 commit daec029
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 23 deletions.
23 changes: 13 additions & 10 deletions src/substantial/src/backends/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl super::BackendStore for RedisBackend {
let non_prefixed_sched_ref = schedule.to_rfc3339();
let sched_score = 1.0 / (schedule.timestamp() as f64);
let sched_key = self.key(&[&non_prefixed_sched_ref, &run_id])?;
let sched_ref = self.key(&[&non_prefixed_sched_ref])?;
let sched_ref = self.key(&["ref_", &run_id, &non_prefixed_sched_ref])?;

let script = Script::new(
r#"
Expand Down Expand Up @@ -194,7 +194,7 @@ impl super::BackendStore for RedisBackend {
let q_key: String = self.key(&["schedules", &queue])?;
let non_prefixed_sched_ref = schedule.to_rfc3339();
let sched_key = self.key(&[&non_prefixed_sched_ref, &run_id])?;
let sched_ref = self.key(&[&non_prefixed_sched_ref])?;
let sched_ref = self.key(&["ref_", &run_id, &non_prefixed_sched_ref])?;

let script = Script::new(
r#"
Expand Down Expand Up @@ -232,10 +232,10 @@ impl super::BackendAgent for RedisBackend {
r#"
local q_key = KEYS[1]
local excludes = ARGV
local schedule_keys = redis.call("ZRANGE", q_key, 0, -1)
local schedule_refs = redis.call("ZRANGE", q_key, 0, -1)
for _, schedule_key in ipairs(schedule_keys) do
local run_ids = redis.call("ZRANGE", schedule_key, 0, -1)
for _, schedule_ref in ipairs(schedule_refs) do
local run_ids = redis.call("ZRANGE", schedule_ref, 0, -1)
for _, run_id in ipairs(run_ids) do
local is_excluded = false
for k = 1, #excludes do
Expand All @@ -246,7 +246,7 @@ impl super::BackendAgent for RedisBackend {
end
if not is_excluded then
return {run_id, schedule_key}
return {run_id, schedule_ref}
end
end
end
Expand All @@ -257,16 +257,19 @@ impl super::BackendAgent for RedisBackend {

let lua_ret: Option<(String, String)> = script.key(q_key).arg(excludes).invoke(r)?;

if let Some((run_id, schedule_key)) = lua_ret {
if let Some((run_id, schedule_ref)) = lua_ret {
let schedule = self
.parts(&schedule_key)?
.parts(&schedule_ref)?
.last()
.cloned()
.with_context(|| format!("Invalid key {:?}", schedule_key))?;
.with_context(|| format!("Invalid key {:?}", schedule_ref))?;
println!("{:?}", schedule_ref);

return Ok(Some(NextRun {
run_id,
schedule_date: DateTime::parse_from_rfc3339(&schedule)?.to_utc(),
schedule_date: DateTime::parse_from_rfc3339(&schedule)
.with_context(|| format!("Parsing {:?}", schedule))?
.to_utc(),
}));
}

Expand Down
12 changes: 0 additions & 12 deletions src/typegate/src/runtimes/substantial/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,7 @@ export class Agent {
) {}

async schedule(input: AddScheduleInput) {
// FIXME:
// This function is triggered by the user (start, event, stop)
// Using async rust in here can be tricky, one issue for example is that
// concurrent calls fail silently without panics or even exceptions on the Redis Backend
// mutation {
// one: start(..) # calls schedule(..)
// ..
// tenth: start(..) # calls schedule(..)
// }

await Meta.substantial.storeAddSchedule(input);
// This delay is completely unrelated to the rust side and solves the issue
await sleep(100);
}

async log(runId: string, schedule: string, content: unknown) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ export class WorkerManager {
logger.info(`trigger ${type} for ${runId}`);
}

/** Just as the name indicates, this will also decide to actually run it or not depending on the `storedRun` value */
triggerStart(
name: string,
runId: string,
Expand Down

0 comments on commit daec029

Please sign in to comment.