forked from riverqueue/river
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinsert_opts.go
145 lines (128 loc) · 5.73 KB
/
insert_opts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package river
import (
"fmt"
"slices"
"time"
"github.com/riverqueue/river/rivertype"
)
// InsertOpts are optional settings for a new job which can be provided at job
// insertion time. These will override any default InsertOpts settings provided
// by JobArgsWithInsertOpts, as well as any global defaults.
type InsertOpts struct {
// MaxAttempts is the maximum number of total attempts (including both the
// original run and all retries) before a job is abandoned and set as
// discarded.
MaxAttempts int
// Priority is the priority of the job, with 1 being the highest priority and
// 4 being the lowest. When fetching available jobs to work, the highest
// priority jobs will always be fetched before any lower priority jobs are
// fetched. Note that if your workers are swamped with more high-priority jobs
// then they can handle, lower priority jobs may not be fetched.
//
// Defaults to PriorityDefault.
Priority int
// Queue is the name of the job queue in which to insert the job.
//
// Defaults to QueueDefault.
Queue string
// ScheduledAt is a time in future at which to schedule the job (i.e. in
// cases where it shouldn't be run immediately). The job is guaranteed not
// to run before this time, but may run slightly after depending on the
// number of other scheduled jobs and how busy the queue is.
//
// Use of this option generally only makes sense when passing options into
// Insert rather than when a job args struct is implementing
// JobArgsWithInsertOpts, however, it will work in both cases.
ScheduledAt time.Time
// Tags are an arbitrary list of keywords to add to the job. They have no
// functional behavior and are meant entirely as a user-specified construct
// to help group and categorize jobs.
//
// If tags are specified from both a job args override and from options on
// Insert, the latter takes precedence. Tags are not merged.
Tags []string
// UniqueOpts returns options relating to job uniqueness. An empty struct
// avoids setting any worker-level unique options.
UniqueOpts UniqueOpts
}
// UniqueOpts contains parameters for uniqueness for a job.
//
// When the options struct is uninitialized (its zero value) no uniqueness at is
// enforced. As each property is initialized, it's added as a dimension on the
// uniqueness matrix, and with any property on, the job's kind always counts
// toward uniqueness.
//
// So for example, if only ByQueue is on, then for the given job kind, only a
// single instance is allowed in any given queue, regardless of other properties
// on the job. If both ByArgs and ByQueue are on, then for the given job kind, a
// single instance is allowed for each combination of args and queues. If either
// args or queue is changed on a new job, it's allowed to be inserted as a new
// job.
type UniqueOpts struct {
// ByArgs indicates that uniqueness should be enforced for any specific
// instance of encoded args for a job.
//
// Default is false, meaning that as long as any other unique property is
// enabled, uniqueness will be enforced for a kind regardless of input args.
ByArgs bool
// ByPeriod defines uniqueness within a given period. On an insert time is
// rounded down to the nearest multiple of the given period, and a job is
// only inserted if there isn't an existing job that will run between then
// and the next multiple of the period.
//
// Default is no unique period, meaning that as long as any other unique
// property is enabled, uniqueness will be enforced across all jobs of the
// kind in the database, regardless of when they were scheduled.
ByPeriod time.Duration
// ByQueue indicates that uniqueness should be enforced within each queue.
//
// Default is false, meaning that as long as any other unique property is
// enabled, uniqueness will be enforced for a kind across all queues.
ByQueue bool
// ByState indicates that uniqueness should be enforced across any of the
// states in the given set. For example, if the given states were
// `(scheduled, running)` then a new job inserted as `scheduled` would be
// not be inserted by virtue of it being not unique, but a new job marked as
// `available` could be inserted.
//
// Unlike other unique options, ByState gets a default when it's not set for
// user convenience. The default is equivalent to:
//
// ByState: []river.JobState{river.JobStateAvailable, river.JobStateCompleted, river.JobStateRunning, river.JobStateRetryable, river.JobStateScheduled}
//
// With this setting, any jobs of the same kind that have been completed or
// discarded, but not yet cleaned out by the system, won't count towards the
// uniqueness of a new insert.
ByState []rivertype.JobState
}
// isEmpty returns true for an empty, uninitialized options struct.
//
// This is required because we can't check against `JobUniqueOpts{}` because
// slices aren't comparable. Unfortunately it makes things a little more brittle
// comparatively because any new options must also be considered here for things
// to work.
func (o *UniqueOpts) isEmpty() bool {
return !o.ByArgs &&
o.ByPeriod == time.Duration(0) &&
!o.ByQueue &&
o.ByState == nil
}
func (o *UniqueOpts) validate() error {
if o.isEmpty() {
return nil
}
if o.ByPeriod != time.Duration(0) && o.ByPeriod < 1*time.Second {
return fmt.Errorf("JobUniqueOpts.ByPeriod should not be less than 1 second")
}
// Job states are typed, but since the underlying type is a string, users
// can put anything they want in there.
for _, state := range o.ByState {
// This could be turned to a map lookup, but last I checked the speed
// difference for tiny slice sizes is negligible, and map lookup might
// even be slower.
if !slices.Contains(jobStateAll, state) {
return fmt.Errorf("JobUniqueOpts.ByState contains invalid state %q", state)
}
}
return nil
}