Skip to content

Commit

Permalink
Merge pull request #12 from SberMarket-Tech/upstream-commits
Browse files Browse the repository at this point in the history
Upstream commits
  • Loading branch information
hageshtrem authored Mar 7, 2023
2 parents 258415f + acf67a8 commit 7f03a5b
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 33 deletions.
49 changes: 34 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,21 @@ func (c *Context) Export(job *work.Job) error {
}
```

## Redis Cluster
If you're attempting to use gocraft/work on a `Redis Cluster` deployment, then you may encounter a `CROSSSLOT Keys in request don't hash to the same slot` error during the execution of the various lua scripts used to manage job data (see [Issue 93](https://github.com/gocraft/work/issues/93#issuecomment-401134340)). The current workaround is to force the keys for an entire `namespace` for a given worker pool on a single node in the cluster using [Redis Hash Tags](https://redis.io/topics/cluster-spec#keys-hash-tags). Using the example above:

```go
func main() {
// Make a new pool. Arguments:
// Context{} is a struct that will be the context for the request.
// 10 is the max concurrency
// "my_app_namespace" is the Redis namespace and the {} chars forces all of the keys onto a single node
// redisPool is a Redis pool
pool := work.NewWorkerPool(Context{}, 10, "{my_app_namespace}", redisPool)
```
*Note* this is not an issue for Redis Sentinel deployments.
## Special Features
### Contexts
Expand Down Expand Up @@ -204,16 +219,32 @@ job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "78
### Periodic Enqueueing (Cron)
You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The [scheduling specification](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
You can periodically enqueue jobs on your gocraft/work cluster using your worker
pool. The [scheduling specification](https://pkg.go.dev/github.com/robfig/cron/v3#hdr-CRON_Expression_Format)
uses a Cron syntax where the fields represent an optional seconds, minutes, hours,
day of the month, month, and week of the day, respectively. Even if you have multiple
worker pools on different machines, they'll all coordinate and only enqueue your
job once.
**NOTE:** Use only the asterisk format as in the example below, not @every. In this case, each job has a predictable byte format and will be deduplicated.
**NOTE:** Use only the asterisk format as in the example below, not @every. In
this case, each job has a predictable byte format and will be deduplicated.
```go
pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // This will enqueue a "calculate_caches" job every hour
pool.Job("calculate_caches", (*Context).CalculateCaches) // Still need to register a handler for this job separately
```
## Job concurrency
You can control job concurrency using `JobOptions{MaxConcurrency: <num>}`. Unlike the WorkerPool concurrency, this controls the limit on the number jobs of that type that can be active at one time by within a single redis instance. This works by putting a precondition on enqueuing function, meaning a new job will not be scheduled if we are at or over a job's `MaxConcurrency` limit. A redis key (see `redis.go::redisKeyJobsLock`) is used as a counting semaphore in order to track job concurrency per job type. The default value is `0`, which means "no limit on job concurrency".
**Note:** if you want to run jobs "single threaded" then you can set the `MaxConcurrency` accordingly:
```go
worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)
```
## Run the Web UI
The web UI provides a view to view the state of your gocraft/work cluster, inspect queued jobs, and retry or delete dead jobs.
Expand Down Expand Up @@ -299,23 +330,11 @@ You'll see a view that looks like this:
* Each worker pool will wake up every 2 minutes, and if jobs haven't been scheduled yet, it will schedule all the jobs that would be executed in the next five minutes.
* Each periodic job that runs at a given time has a predictable byte pattern. Since jobs are scheduled on the scheduled job queue (a Redis z-set), if the same job is scheduled twice for a given time, it can only exist in the z-set once.

## Paused jobs
### Paused jobs

* You can pause jobs from being processed from a specific queue by setting a "paused" redis key (see `redisKeyJobsPaused`)
* Conversely, jobs in the queue will resume being processed once the paused redis key is removed

## Job concurrency

* You can control job concurrency using `JobOptions{MaxConcurrency: <num>}`.
* Unlike the WorkerPool concurrency, this controls the limit on the number jobs of that type that can be active at one time by within a single redis instance
* This works by putting a precondition on enqueuing function, meaning a new job will not be scheduled if we are at or over a job's `MaxConcurrency` limit
* A redis key (see `redisKeyJobsLock`) is used as a counting semaphore in order to track job concurrency per job type
* The default value is `0`, which means "no limit on job concurrency"
* **Note:** if you want to run jobs "single threaded" then you can set the `MaxConcurrency` accordingly:
```go
worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)
```

### Terminology reference
* "worker pool" - a pool of workers
* "worker" - an individual worker in a single goroutine. Gets a job from redis, does job, gets next job...
Expand Down
6 changes: 5 additions & 1 deletion benches/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/garyburd/redigo v1.6.3 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/trace v1.11.2 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect
vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible // indirect
)
40 changes: 35 additions & 5 deletions benches/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,28 @@ github.com/cihub/seelog v0.0.0-20140730094913-72ae425987bc/go.mod h1:9d6lWj8KzO/
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39 h1:O0YTztXI3XeJXlFhSo4wNb0VBVqSgT+hi/CjNWKvMnY=
github.com/customerio/gospec v0.0.0-20130710230057-a5cc0e48aa39/go.mod h1:OzYUFhPuL2JbjwFwrv6CZs23uBawekc6OZs+g19F0mY=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5 h1:RAV05c0xOkJ3dZGS0JFybxFKZ2WMLabgx3uXnd7rpGs=
github.com/dchest/uniuri v0.0.0-20200228104902-7aecb25e1fe5/go.mod h1:GgB8SF9nRG+GqaDtLcwJZsQFhcogVCJ79j4EdT0c2V4=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/garyburd/redigo v1.6.3 h1:HCeeRluvAgMusMomi1+6Y5dmFOdYV/JzoRrrbFlkGIc=
github.com/garyburd/redigo v1.6.3/go.mod h1:rTb6epsqigu3kYKBnaF028A7Tf/Aw5s0cqA47doKKqw=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0 h1:pKjeDsx7HGGbjr7VGI1HksxDJqSjaGED3cSw9GeSI98=
github.com/gocraft/health v0.0.0-20170925182251-8675af27fef0/go.mod h1:rWibcVfwbUxi/QXW84U7vNTcIcZFd6miwbt8ritxh/Y=
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b/go.mod h1:Ag7UMbZNGrnHwaXPJOUKJIVgx4QOWMOWZngrvsN6qak=
github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/gomodule/redigo v1.8.8 h1:f6cXq6RRfiyrOJEV7p3JhLDlmawGBVBBP1MggY8Mo4E=
github.com/gomodule/redigo v1.8.8/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb h1:y9LFhCM3gwK94Xz9/h7GcSVLteky9pFHEkP04AqQupA=
github.com/jrallison/go-workers v0.0.0-20180112190529-dbf81d0b75bb/go.mod h1:ziQRRNHCWZe0wVNzF8y8kCWpso0VMpqHJjB19DSenbE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand All @@ -35,16 +43,33 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0=
go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI=
go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU=
go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU=
go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0=
go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand All @@ -53,12 +78,17 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 h1:h+EGohizhe9XlX18rfpa8k8RAc5XyaeamM+0VHRd4lc=
golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible h1:TCG4ZGCiFNr7XGP8nhT++5Wwi1jRC6Xk9IPxZiBQXB0=
vitess.io/vitess v3.0.0-rc.3.0.20181212200900-e2c5239f54d1+incompatible/go.mod h1:h4qvkyNYTOC0xI+vcidSWoka0gQAZc9ZPHbkHo48gP0=
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/braintree/manners v0.0.0-20160418043613-82a8879fc5fd
github.com/gocraft/web v0.0.0-20190207150652-9707327fb69b
github.com/gomodule/redigo v1.8.8
github.com/robfig/cron v1.2.0
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.1
go.opentelemetry.io/otel v1.11.2
go.opentelemetry.io/otel/sdk v1.11.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWb
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
2 changes: 1 addition & 1 deletion periodic_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"

"github.com/gomodule/redigo/redis"
"github.com/robfig/cron"
"github.com/robfig/cron/v3"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions periodic_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

"github.com/gomodule/redigo/redis"
"github.com/robfig/cron"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -110,7 +110,7 @@ func TestPeriodicEnqueuerSpawn(t *testing.T) {
}

func appendPeriodicJob(pjs []*periodicJob, spec, jobName string) []*periodicJob {
sched, err := cron.Parse(spec)
sched, err := cron.NewParser(cronFormat).Parse(spec)
if err != nil {
panic(err)
}
Expand Down
4 changes: 2 additions & 2 deletions requeuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package work
import (
"testing"

"github.com/robfig/cron"
"github.com/robfig/cron/v3"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -88,7 +88,7 @@ func TestRequeuePeriodic(t *testing.T) {

jobName := "clean"
jobSpec := "*/1 * * * * *"
shedule, err := cron.Parse(jobSpec)
shedule, err := cron.NewParser(cronFormat).Parse(jobSpec)
assert.NoError(t, err)

jobs := []*periodicJob{
Expand Down
10 changes: 6 additions & 4 deletions worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"strings"
"sync"

"github.com/robfig/cron"
"github.com/robfig/cron/v3"
)

const cronFormat = cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor

// WorkerPool represents a pool of workers. It forms the primary API of gocraft/work. WorkerPools provide the public API of gocraft/work. You can attach jobs and middlware to them. You can start and stop them. Based on their concurrency setting, they'll spin up N worker goroutines.
type WorkerPool struct {
workerPoolID string
Expand Down Expand Up @@ -192,11 +194,11 @@ func (wp *WorkerPool) JobWithOptions(name string, jobOpts JobOptions, fn interfa
}

// PeriodicallyEnqueue will periodically enqueue jobName according to the cron-based spec.
// The spec format is based on https://godoc.org/github.com/robfig/cron, which is a relatively standard cron format.
// Note that the first value is the seconds!
// The spec format is based on github.com/robfig/cron/v3, which is a relatively standard cron format.
// Note that the first value can be seconds!
// If you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.
func (wp *WorkerPool) PeriodicallyEnqueue(spec string, jobName string) *WorkerPool {
schedule, err := cron.Parse(spec)
schedule, err := cron.NewParser(cronFormat).Parse(spec)
if err != nil {
panic(err)
}
Expand Down

0 comments on commit 7f03a5b

Please sign in to comment.