Skip to content

Commit

Permalink
Prevent a decoding error from bringing down the requeue system
Browse files Browse the repository at this point in the history
  • Loading branch information
Joannis committed Apr 14, 2023
1 parent 39e4021 commit 0ac756f
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions Sources/MongoQueue/MongoQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -335,25 +335,39 @@ public final class MongoQueue {
}

private func findAndRequeueStaleTasks() async throws {
for type in knownTypes {
let executingTasks = try await collection.find(
"category" == type.category && "status" == TaskStatus.executing.raw.rawValue
).decode(TaskModel.self).execute()

for try await task in executingTasks {
if
let lastUpdateDate = task.execution?.lastUpdate,
lastUpdateDate.addingTimeInterval(task.maxTaskDuration) <= Date()
{
logger.info("Dequeueing stale task id \(task._id) of type \(task.category)")
_ = try await collection.findOneAndUpdate(where: "_id" == task._id, to: [
"$set": [
"status": TaskStatus.scheduled.raw.rawValue,
"execution": Null()
] as Document
]).execute()
try await withThrowingTaskGroup(of: Void.self) { group in
for type in knownTypes {
group.addTask {
let executingTasks = self.collection.find(
"category" == type.category && "status" == TaskStatus.executing.raw.rawValue
).decode(TaskModel.self)

for try await task in executingTasks {
if
let lastUpdateDate = task.execution?.lastUpdate,
lastUpdateDate.addingTimeInterval(task.maxTaskDuration) <= Date()
{
async let _ = await self.requeueStaleTask(task)
}
}
}
}

try await group.waitForAll()
}
}

private func requeueStaleTask(_ task: TaskModel) async {
self.logger.info("Dequeueing stale task id \(task._id) of type \(task.category)")
do {
_ = try await self.collection.findOneAndUpdate(where: "_id" == task._id, to: [
"$set": [
"status": TaskStatus.scheduled.raw.rawValue,
"execution": Null()
] as Document
]).execute()
} catch {
self.logger.error("Failed to dequeue stale task id \(task._id) of type \(task.category)")
}
}
}

0 comments on commit 0ac756f

Please sign in to comment.