diff --git a/README.md b/README.md index 1fe9afe..ce5f51c 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,9 @@ This package heavily inspired by [hibiken/asynq](https://github.com/hibiken/asyn * [Status](#status) * [Features](#features) - * [Examples](https://github.com/choria-io/asyncjobs/wiki/Introductory-Golang-Walkthrough) + * Examples + * [Golang](https://github.com/choria-io/asyncjobs/wiki/Introductory-Golang-Walkthrough) + * [CLI](https://github.com/choria-io/asyncjobs/wiki/Introductory-CLI-Walkthrough) [![Go Reference](https://pkg.go.dev/badge/github.com/choria-io/asyncjobs.svg)](https://pkg.go.dev/github.com/choria-io/asyncjobs) [![Go Report Card](https://goreportcard.com/badge/github.com/choria-io/asyncjobs)](https://goreportcard.com/report/github.com/choria-io/asyncjobs) diff --git a/ajc/queue_command.go b/ajc/queue_command.go index 3e7820d..914d9e8 100644 --- a/ajc/queue_command.go +++ b/ajc/queue_command.go @@ -9,6 +9,7 @@ import ( "sort" "time" + "github.com/choria-io/asyncjobs" "github.com/dustin/go-humanize" "gopkg.in/alecthomas/kingpin.v2" ) @@ -22,6 +23,9 @@ type queueCommand struct { maxTries int maxTime time.Duration maxConcurrent int + memory bool + replicas int + discardOld bool } func configureQueueCommand(app *kingpin.Application) { @@ -29,6 +33,17 @@ func configureQueueCommand(app *kingpin.Application) { queues := app.Command("queues", "Manage Work Queues").Alias("q").Alias("queue") + add := queues.Command("new", "Creates a new Queue").Alias("add").Alias("n").Alias("a").Action(c.addAction) + add.Arg("queue", "Queue to Configure").Required().StringVar(&c.name) + add.Flag("age", "Sets the maximum age for entries to keep, 0s for unlimited").Default("0s").DurationVar(&c.maxAge) + add.Flag("entries", "Sets the maximum amount of entries to keep, 0 for unlimited").Default("0").IntVar(&c.maxEntries) + add.Flag("tries", "Maximum delivery attempts to allow per message, -1 for unlimited").Default(fmt.Sprintf("%d", asyncjobs.DefaultMaxTries)).IntVar(&c.maxTries) + add.Flag("run-time", "Maximum run-time to allow per task").Default(asyncjobs.DefaultJobRunTime.String()).DurationVar(&c.maxTime) + add.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default(fmt.Sprintf("%d", asyncjobs.DefaultQueueMaxConcurrent)).IntVar(&c.maxConcurrent) + add.Flag("memory", "Store the Queue in memory").BoolVar(&c.memory) + add.Flag("replicas", "Number of storage replicas to configure").Default("1").IntVar(&c.replicas) + add.Flag("discard-old", "When full, discard old entries").BoolVar(&c.discardOld) + queues.Command("list", "List Queues").Alias("ls").Action(c.lsAction) rm := queues.Command("delete", "Removes the entire work queue").Alias("rm").Action(c.rmAction) @@ -51,6 +66,42 @@ func configureQueueCommand(app *kingpin.Application) { cfg.Flag("concurrent", "Maximum concurrent jobs that can be ran").Default("-2").IntVar(&c.maxConcurrent) } +func (c *queueCommand) addAction(_ *kingpin.ParseContext) error { + err := prepare() + if err != nil { + return err + } + + _, err = admin.QueueInfo(c.name) + if err == nil { + return fmt.Errorf("queue %s already exist", c.name) + } + + queue := &asyncjobs.Queue{ + Name: c.name, + MaxAge: c.maxAge, + MaxEntries: c.maxEntries, + DiscardOld: c.discardOld, + MaxTries: c.maxTries, + MaxRunTime: c.maxTime, + MaxConcurrent: c.maxConcurrent, + } + + err = admin.PrepareQueue(queue, c.replicas, c.memory) + if err != nil { + return err + } + + nfo, err := admin.QueueInfo(c.name) + if err != nil { + return err + } + + showQueue(nfo) + + return nil +} + func (c *queueCommand) configureAction(_ *kingpin.ParseContext) error { err := prepare() if err != nil { diff --git a/ajc/task_command.go b/ajc/task_command.go index 366dec8..56a3b14 100644 --- a/ajc/task_command.go +++ b/ajc/task_command.go @@ -339,7 +339,7 @@ func (c *taskCommand) viewAction(_ *kingpin.ParseContext) error { } func (c *taskCommand) addAction(_ *kingpin.ParseContext) error { - err := prepare() + err := prepare(asyncjobs.BindWorkQueue(c.queue)) if err != nil { return err } diff --git a/ajc/util.go b/ajc/util.go index bff9ba6..65b9355 100644 --- a/ajc/util.go +++ b/ajc/util.go @@ -40,6 +40,12 @@ func prepare(copts ...asyncjobs.ClientOpt) error { conn := []nats.Option{ nats.MaxReconnects(10), nats.Name("Choria Asynchronous Jobs CLI version " + version), + nats.ReconnectHandler(func(nc *nats.Conn) { + log.Printf("Reconnected to NATS server %s", nc.ConnectedUrl()) + }), + nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Printf("Disconnected from server: %v", err) + }), nats.ErrorHandler(func(nc *nats.Conn, _ *nats.Subscription, err error) { url := nc.ConnectedUrl() if url == "" { @@ -51,7 +57,8 @@ func prepare(copts ...asyncjobs.ClientOpt) error { } opts := []asyncjobs.ClientOpt{ - asyncjobs.CustomLogger(log), asyncjobs.NatsContext(nctx, conn...), + asyncjobs.CustomLogger(log), + asyncjobs.NatsContext(nctx, conn...), } opts = append(opts, copts...) diff --git a/client_options.go b/client_options.go index 1c0a6ba..ac3da5e 100644 --- a/client_options.go +++ b/client_options.go @@ -132,6 +132,22 @@ func WorkQueue(queue *Queue) ClientOpt { } } +// BindWorkQueue binds the client to a work queue that should already exist +func BindWorkQueue(queue string) ClientOpt { + return func(opts *ClientOpts) error { + if queue == "" { + return fmt.Errorf("a queue name is required") + } + if opts.queue != nil { + return fmt.Errorf("a queue has already been defined") + } + + opts.queue = &Queue{Name: queue, NoCreate: true} + + return nil + } +} + // TaskRetention is the time tasks will be kept with. // // Used only when initially creating the underlying streams. diff --git a/go.mod b/go.mod index 87cc4e4..972a494 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/segmentio/ksuid v1.0.4 github.com/sirupsen/logrus v1.6.0 github.com/xlab/tablewriter v0.0.0-20160610135559-80b567a11ad5 - golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce + golang.org/x/term v0.0.0-20210503060354-a79de5458b56 gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) @@ -38,9 +38,9 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce // indirect golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2 // indirect golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect - golang.org/x/term v0.0.0-20210503060354-a79de5458b56 // indirect golang.org/x/text v0.3.6 // indirect golang.org/x/time v0.0.0-20211116232009-f0f3c7e86c11 // indirect google.golang.org/protobuf v1.26.0 // indirect diff --git a/go.sum b/go.sum index 62a7141..88c683a 100644 --- a/go.sum +++ b/go.sum @@ -192,10 +192,6 @@ github.com/nats-io/jsm.go v0.0.28-0.20220128163911-90cd1007b323/go.mod h1:HU1JmK github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296 h1:vU9tpM3apjYlLLeY23zRWJ9Zktr5jp+mloR942LEOpY= github.com/nats-io/jwt/v2 v2.2.1-0.20220113022732-58e87895b296/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= github.com/nats-io/nats-server/v2 v2.7.2-0.20220126224453-26b692ee73c0/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220130232407-6b690bd5b635 h1:h6+xCwResqI/W7AFR0E8wyYS89zXBBS85Qd0gf+iW/E= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220130232407-6b690bd5b635/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b h1:h8EYD8Q7yUbjXmMT6z1XI7SAV+aiHhkNEc1O+WImMh4= -github.com/nats-io/nats-server/v2 v2.7.2-0.20220131171338-74c0fdc3bb8b/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2 h1:/ocgZt+pxx9ocGWWdeBAJfg0tqoz4uUoIgCNepKnnDQ= github.com/nats-io/nats-server/v2 v2.7.2-0.20220201222209-2ed7a812d8b2/go.mod h1:tckmrt0M6bVaDT3kmh9UrIq/CBOBBse+TpXQi5ldaa8= github.com/nats-io/nats.go v1.13.1-0.20220121202836-972a071d373d h1:GRSmEJutHkdoxKsRypP575IIdoXe7Bm6yHQF6GcDBnA=