-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjob.go
156 lines (125 loc) · 3.1 KB
/
job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package gokogeri
import (
"encoding/json"
"fmt"
"time"
"github.com/kapvode/gokogeri/internal/sidekiq"
)
// redisJob represents the job data as it is encoded in Redis.
type redisJob struct {
Class string `json:"class"`
Queue string `json:"queue"`
Args []interface{} `json:"args,omitempty"`
Retry retryValue `json:"retry"`
JobID string `json:"jid"`
CreatedAt float64 `json:"created_at"`
EnqueuedAt float64 `json:"enqueued_at"`
}
type Job struct {
enc redisJob
createdAt time.Time
enqueuedAt time.Time
customRetryPolicy bool
}
func newJobFromJSON(data []byte) (*Job, error) {
job := &Job{}
err := json.Unmarshal(data, &job.enc)
if err != nil {
return nil, fmt.Errorf("decoding job json: %v", err)
}
job.createdAt = sidekiq.ToTime(job.enc.CreatedAt)
job.enqueuedAt = sidekiq.ToTime(job.enc.EnqueuedAt)
return job, nil
}
func (j *Job) ID() string {
return j.enc.JobID
}
func (j *Job) SetID(id string) *Job {
j.enc.JobID = id
return j
}
// Class returns the Ruby class that implements the job.
func (j *Job) Class() string {
return j.enc.Class
}
func (j *Job) SetClass(c string) *Job {
j.enc.Class = c
return j
}
func (j *Job) Queue() string {
return j.enc.Queue
}
func (j *Job) SetQueue(q string) *Job {
j.enc.Queue = q
return j
}
func (j *Job) Args() []interface{} {
return j.enc.Args
}
func (j *Job) SetArgs(args []interface{}) *Job {
j.enc.Args = args
return j
}
func (j *Job) CreatedAt() time.Time {
return j.createdAt
}
func (j *Job) SetCreatedAt(t time.Time) *Job {
j.createdAt = t
return j
}
func (j *Job) EnqueuedAt() time.Time {
return j.enqueuedAt
}
// Retry reports whether the job should be retried if it fails.
func (j *Job) Retry() bool {
return j.enc.Retry.ok
}
// SetRetry configures whether the job should be retried if it fails.
func (j *Job) SetRetry(retry bool) *Job {
j.customRetryPolicy = true
j.enc.Retry.ok = retry
return j
}
// RetryTimes returns the number of times the job should be retried, or 0 if the default value should be used.
func (j *Job) RetryTimes() int {
return j.enc.Retry.times
}
// SetRetryTimes configures the number of times the job should be retried.
// The minimum allowed value is 0 and the maximum 100. Values outside of that range will be ignored.
//
// Calling this function always enables retries, because 0 represents the default value for the number of retries. To
// disable retries, use SetRetry(false).
func (j *Job) SetRetryTimes(n int) *Job {
if n >= 0 && n <= 100 {
j.customRetryPolicy = true
j.enc.Retry.ok = true
j.enc.Retry.times = n
}
return j
}
func (j *Job) setDefaults() error {
if j.enc.Queue == "" {
j.enc.Queue = "default"
}
now := time.Now()
j.enqueuedAt = now
j.enc.EnqueuedAt = sidekiq.Time(j.enqueuedAt)
if j.createdAt.IsZero() {
j.createdAt = now
}
j.enc.CreatedAt = sidekiq.Time(j.createdAt)
var err error
if j.enc.JobID == "" {
j.enc.JobID, err = sidekiq.JobID()
if err != nil {
return fmt.Errorf("create job ID: %v", err)
}
}
if !j.customRetryPolicy {
j.enc.Retry.ok = true
}
return nil
}
func (j *Job) encode() ([]byte, error) {
return json.Marshal(j.enc)
}