-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathmain.go
133 lines (116 loc) · 3.99 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"flag"
"fmt"
"github.com/bitly/go-nsq"
"github.com/bitly/nsq/internal/app"
"github.com/bitly/nsq/internal/version"
log "github.com/cihub/seelog"
"os"
"os/signal"
"syscall"
"time"
)
var (
showVersion = flag.Bool("version", false, "print version string")
topic = flag.String("topic", "", "NSQ topic")
channel = flag.String("channel", "", "NSQ channel")
maxInFlight = flag.Int("max-in-flight", 1000, "max number of messages to allow in flight (before flushing)")
maxInFlightTime = flag.Int("max-in-flight-time", 60, "max time to keep messages in flight (before flushing)")
bucketMessages = flag.Int("bucket-messages", 0, "total number of messages to bucket")
bucketSeconds = flag.Int("bucket-seconds", 600, "total time to bucket messages for (seconds)")
s3Bucket = flag.String("s3bucket", "", "S3 bucket-name to store the output on (eg 'nsq-archive'")
s3Path = flag.String("s3path", "", "S3 path to store files under (eg '/nsq-archive'")
awsRegion = flag.String("awsregion", "us-east-1", "The AWS region-name to connect to")
batchMode = flag.String("batchmode", "memory", "How to batch the messages between flushes [disk, memory, channel]")
messageBufferFileName = flag.String("bufferfile", "", "Local file to buffer messages in between flushes to S3")
s3FileExtention = flag.String("extention", "txt", "Extention for files on S3")
consumerOpts = app.StringArray{}
nsqdTCPAddrs = app.StringArray{}
lookupdHTTPAddrs = app.StringArray{}
)
func init() {
flag.Var(&consumerOpts, "consumer-opt", "option to passthrough to nsq.Consumer (may be given multiple times, http://godoc.org/github.com/bitly/go-nsq#Config)")
flag.Var(&nsqdTCPAddrs, "nsqd-tcp-address", "nsqd TCP address (may be given multiple times)")
flag.Var(&lookupdHTTPAddrs, "lookupd-http-address", "lookupd HTTP address (may be given multiple times)")
}
func main() {
// Make sure we flush the log before quitting:
defer log.Flush()
// Process the arguments:
argumentIssue := processArguments()
if argumentIssue {
os.Exit(1)
}
// Intercept quit signals:
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Don't ask for more messages than we want
if *bucketMessages > 0 && *bucketMessages < *maxInFlight {
*maxInFlight = *bucketMessages
}
// Set up the NSQ client:
cfg := nsq.NewConfig()
cfg.UserAgent = fmt.Sprintf("nsq_to_s3/%s go-nsq/%s", version.Binary, nsq.VERSION)
err := app.ParseOpts(cfg, consumerOpts)
if err != nil {
panic(err)
}
cfg.MaxInFlight = *maxInFlight
consumer, err := nsq.NewConsumer(*topic, *channel, cfg)
if err != nil {
panic(err)
}
// See which mode we've been asked to run in:
switch *batchMode {
case "disk":
{
// On-disk:
messageHandler := &OnDiskHandler{
allTimeMessages: 0,
deDuper: make(map[string]int),
inFlightMessages: make([]*nsq.Message, 0),
timeLastFlushedToS3: int(time.Now().Unix()),
timeLastFlushedToDisk: int(time.Now().Unix()),
}
// Add the handler:
consumer.AddHandler(messageHandler)
}
case "channel":
{
panic("'channel' batch-mode isn't implemented yet!")
}
default:
{
// Default to in-memory:
messageHandler := &InMemoryHandler{
allTimeMessages: 0,
deDuper: make(map[string]int),
messageBuffer: make([]*nsq.Message, 0),
timeLastFlushedToS3: int(time.Now().Unix()),
}
// Add the handler:
consumer.AddHandler(messageHandler)
}
}
// Configure the NSQ connection with the list of NSQd addresses:
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
panic(err)
}
// Configure the NSQ connection with the list of Lookupd HTTP addresses:
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
panic(err)
}
// Handle stop / quit events:
for {
select {
case <-consumer.StopChan:
return
case <-sigChan:
consumer.Stop()
os.Exit(0)
}
}
}