-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathworker.service.ts
112 lines (94 loc) · 2.98 KB
/
worker.service.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import { Inject, Injectable, Logger } from "@nestjs/common";
import {
Job,
Runner,
RunnerOptions,
TaskSpec,
WorkerUtils,
makeWorkerUtils,
run,
runOnce,
} from "graphile-worker";
import { RUNNER_OPTIONS_KEY } from "../interfaces/module-config.interfaces";
import { uniq } from "../utils/array.utils";
import { ListenerExplorerService } from "./listener-explorer.service";
import { TaskExplorerService } from "./task-explorer.service";
@Injectable()
export class WorkerService {
private readonly logger = new Logger(WorkerService.name);
private isMigrationDone = false;
constructor(
@Inject(RUNNER_OPTIONS_KEY) private readonly options: RunnerOptions,
private readonly listenerExplorerService: ListenerExplorerService,
private readonly taskExplorerService: TaskExplorerService,
) {
this.options.taskList = this.taskExplorerService.taskList;
this.hookEvents();
}
/**
* Runs until either stopped by a signal event like `SIGINT` or by calling the `stop()` method on the resolved object.
*
* The resolved `Runner` object has a number of helpers on it, see [Runner object](https://github.com/graphile/worker#runner-object) for more information.
*/
async run(): Promise<Runner> {
await this.runMigrations();
this.logger.debug("Start runner");
return await run(this.options);
}
/**
* Runs until there are no runnable jobs left, and then resolve.
*/
async runOnce(): Promise<void> {
await this.runMigrations();
this.logger.debug("Start runner");
await runOnce(this.options);
}
getWorkerUtils(): Promise<WorkerUtils> {
return makeWorkerUtils(this.options);
}
async addJob(
identifier: string,
payload?: unknown,
spec?: TaskSpec,
): Promise<Job> {
const [job] = await this.addJobs([{ identifier, payload, spec }]);
return job;
}
async addJobs(
jobs: Array<{ identifier: string; payload?: unknown; spec?: TaskSpec }>,
): Promise<Job[]> {
const workerUtils = await this.getWorkerUtils();
const createdJobs: Job[] = [];
try {
await workerUtils.migrate();
for (const { identifier, payload, spec } of jobs) {
const job = await workerUtils.addJob(identifier, payload, spec);
createdJobs.push(job);
}
return createdJobs;
} finally {
await workerUtils.release();
}
}
private async runMigrations() {
if (this.isMigrationDone) {
return;
}
this.logger.debug("Run migrations");
this.isMigrationDone = true;
}
private async hookEvents() {
await this.listenerExplorerService.ensureInitialized();
const events = this.listenerExplorerService.listeners.map(
({ event }) => event,
);
if (this.options.events === undefined) return;
for (const event of uniq(events)) {
this.options.events.on(event, (...args: any[]) => {
this.listenerExplorerService.listeners
.filter(({ event: e }) => e === event)
.forEach(({ callback }) => callback(...args));
});
}
}
}