Skip to content

Commit

Permalink
drop producer reconnect logic
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Oct 5, 2018
1 parent f83042b commit 09d8c3a
Showing 1 changed file with 4 additions and 35 deletions.
39 changes: 4 additions & 35 deletions kafkahook/kafkahook.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,47 +88,16 @@ func New(conf Config) (*KafkaHook, error) {
go func() {
for {
select {
case prodErr := <-conf.Producer.Errors():
fmt.Fprintf(os.Stderr, "[kafkahook] producer error '%s'; reconnecting producer...\n", prodErr.Err)

// Close any previously open producers
if conf.Producer != nil {
// Since `Producer.Return.Errors` is true, `Producer.Close()` will send a shutdown error which
// could cause `Close()` to block unless we read it. Also there could be other errors
// that are waiting to be received and must be sent before shutdown is complete.
go func() {
for range conf.Producer.Errors() {}
}()

conf.Producer.Close()
conf.Producer = nil
}
case err := <-conf.Producer.Errors():
msg, _ := err.Msg.Value.Encode()
fmt.Fprintf(os.Stderr, "[kafkahook] produce error '%s' for: %s\n", err.Err, string(msg))

var backOff int
for {
// Attempt to reconnect to the brokers
conf.Producer, err = sarama.NewAsyncProducer(conf.Endpoints, kafkaConfig)
if err != nil {
if backOff < 6 {
backOff++
}
fmt.Fprintf(os.Stderr, "[kafkahook] reconnect error: %s; sleeping (%d)...\n", err, backOff)
time.Sleep(time.Duration(backOff) * time.Second)
continue
}
backOff = 0
break
}
case buf, ok := <-h.produce:
if !ok {
h.wg.Done()
// See comment above
go func() {
for range conf.Producer.Errors() {}
}()
if err := conf.Producer.Close(); err != nil {
fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err)
}
h.wg.Done()
return
}
conf.Producer.Input() <- &sarama.ProducerMessage{
Expand Down

0 comments on commit 09d8c3a

Please sign in to comment.