Skip to content

Commit

Permalink
chore(cleanup)
Browse files Browse the repository at this point in the history
  • Loading branch information
thoven87 committed Sep 5, 2024
1 parent bba72f1 commit 8347bdc
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 47 deletions.
46 changes: 7 additions & 39 deletions Sources/Jobs/JobExecutionOptions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,13 @@ public struct JobExecutionOptions: ExpressibleByArrayLiteral, Sendable {
public struct Option: Sendable {
internal enum _Option {
case delayed(until: Date)
case unknown
}

internal let raw: _Option
/// Delay a job for later execution
public static func delay(until date: Date) -> Option {
Option(raw: .delayed(until: date))
}

/// Default job option
public static func unknown() -> Option {
Option(raw: .unknown)
}

/// Check if a job is delayed or not
public func isDelayed() -> Bool {
switch self.raw {
case .delayed:
return true
default:
return false
}
}

/// Default state for execution
public func isUnknown() -> Bool {
switch self.raw {
case .unknown:
return true
default:
return false
}
}

/// Return time for execution
public func delayedUntil() -> Date? {
switch self.raw {
case .delayed(until: let date):
return date
default:
return nil
}
}
}

private let _options: [Option]
Expand All @@ -78,8 +42,12 @@ public struct JobExecutionOptions: ExpressibleByArrayLiteral, Sendable {

/// Time for when a job should run
public func delayedUntil() -> Date? {
return self._options.first(where: {
$0.isDelayed()
})?.delayedUntil()
for option in self._options {
switch option.raw {
case .delayed(let date):
return date
}
}
return nil
}
}
4 changes: 2 additions & 2 deletions Sources/Jobs/JobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ public struct JobQueue<Queue: JobQueueDriver>: Service {
@discardableResult public func push<Parameters: Codable & Sendable>(
id: JobIdentifier<Parameters>,
parameters: Parameters,
executionOptions: JobExecutionOptions = [.unknown()]
options: JobExecutionOptions = []
) async throws -> Queue.JobID {
let jobRequest = EncodableJob(id: id, parameters: parameters, queuedAt: .now)
let buffer = try JSONEncoder().encodeAsByteBuffer(jobRequest, allocator: ByteBufferAllocator())
Meter(label: "swift_jobs_meter", dimensions: [("status", "queued")]).increment()
let id = try await self.queue.push(buffer, executionOptions: executionOptions)
let id = try await self.queue.push(buffer, options: options)
self.logger.debug(
"Pushed Job",
metadata: ["JobID": .stringConvertible(id), "JobName": .string(jobRequest.id.name)]
Expand Down
4 changes: 2 additions & 2 deletions Sources/Jobs/JobQueueDriver.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public protocol JobQueueDriver: AsyncSequence, Sendable where Element == QueuedJ
/// Called when JobQueueHandler is initialised with this queue
func onInit() async throws
/// Push Job onto queue
/// - delayUntil: When to run a job
/// - options: JobExecutionOptions
/// - Returns: Identifier of queued job
func push(_ buffer: ByteBuffer, executionOptions: JobExecutionOptions) async throws -> JobID
func push(_ buffer: ByteBuffer, options: JobExecutionOptions) async throws -> JobID
/// This is called to say job has finished processing and it can be deleted
func finished(jobId: JobID) async throws
/// This is called to say job has failed to run and should be put aside
Expand Down
4 changes: 2 additions & 2 deletions Sources/Jobs/MemoryJobQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public final class MemoryQueue: JobQueueDriver {
/// - job: Job
/// - eventLoop: Eventloop to run process on (ignored in this case)
/// - Returns: Queued job
@discardableResult public func push(_ buffer: ByteBuffer, executionOptions: JobExecutionOptions) async throws -> JobID {
return try await self.queue.push(buffer, executionOptions: executionOptions)
@discardableResult public func push(_ buffer: ByteBuffer, options: JobExecutionOptions) async throws -> JobID {
return try await self.queue.push(buffer, executionOptions: options)
}

public func finished(jobId: JobID) async throws {
Expand Down
4 changes: 2 additions & 2 deletions Tests/JobsTests/JobsTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ final class JobsTests: XCTestCase {
}
try await self.testJobQueue(jobQueue) {
delayedJob.wrappingIncrement(by: 1, ordering: .relaxed)
try await jobQueue.push(id: job1, parameters: 0, executionOptions: [
.delay(until: Date.now.addingTimeInterval(5)),
try await jobQueue.push(id: job1, parameters: 0, options: [
.delay(until: Date.now.addingTimeInterval(1)),
])
delayedJob.wrappingIncrement(by: 1, ordering: .relaxed)
try await jobQueue.push(id: job2, parameters: 10)
Expand Down

0 comments on commit 8347bdc

Please sign in to comment.