-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.go
105 lines (77 loc) · 2.17 KB
/
example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"context"
"fmt"
"os"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/kapvode/gokogeri"
"github.com/kapvode/gokogeri/redis"
)
func devLogger() zerolog.Logger {
output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: "15:04:05"}
return zerolog.New(output).Level(zerolog.InfoLevel).With().Timestamp().Logger()
}
func main() {
cfg := redis.NewDefaultConfig()
cfg.URL = "redis://localhost/4"
cm := redis.NewConnManager(cfg)
defer cm.Close()
ctx := context.Background()
var job1 gokogeri.Job
job1.SetQueue("critical")
job1.SetClass("CriticalJob")
var job2 gokogeri.Job
job2.SetQueue("low_priority")
job2.SetClass("LowPriorityJob")
enqueuer := gokogeri.NewEnqueuer(cm)
err := enqueuer.Enqueue(ctx, &job1)
if err != nil {
fmt.Println("enqueue job1:", err)
}
err = enqueuer.Enqueue(ctx, &job2)
if err != nil {
fmt.Println("enqueue job2:", err)
}
logger := gokogeri.DefaultLogger()
// Alternatively
// logger := zerolog.Nop()
// logger := devLogger()
node := gokogeri.NewNode(logger, cm, cfg.LongPollTimeout)
node.ProcessQueues(
gokogeri.OrderedQueueSet{"critical"},
gokogeri.WorkerFunc(func(ctx context.Context, j *gokogeri.Job) error {
fmt.Printf("processing job: queue=%s class=%s id=%s\n", j.Queue(), j.Class(), j.ID())
time.Sleep(time.Second * 5)
fmt.Printf("job done: queue=%s class=%s id=%s\n", j.Queue(), j.Class(), j.ID())
return nil
}),
1,
)
qs := gokogeri.NewRandomQueueSet()
qs.Add("low_priority", 1)
qs.Add("high_priority", 3)
node.ProcessQueues(
qs,
gokogeri.WorkerFunc(func(ctx context.Context, j *gokogeri.Job) error {
fmt.Printf("processing job: queue=%s class=%s id=%s\n", j.Queue(), j.Class(), j.ID())
time.Sleep(time.Second * 7)
fmt.Printf("job done: queue=%s class=%s id=%s\n", j.Queue(), j.Class(), j.ID())
return nil
}),
5,
)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
node.Run()
}()
// Give the jobs a chance to start running before we initiate shutdown, since this is just an example.
time.Sleep(time.Second)
shutdownCtx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
node.Stop(shutdownCtx)
wg.Wait()
}