From 09d8c3ad9cd4ee9cbb56e687c2f1b54a7e8c9d90 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Fri, 5 Oct 2018 11:49:41 -0500 Subject: [PATCH] drop producer reconnect logic --- kafkahook/kafkahook.go | 39 ++++----------------------------------- 1 file changed, 4 insertions(+), 35 deletions(-) diff --git a/kafkahook/kafkahook.go b/kafkahook/kafkahook.go index ca9ca64..33b4728 100644 --- a/kafkahook/kafkahook.go +++ b/kafkahook/kafkahook.go @@ -88,47 +88,16 @@ func New(conf Config) (*KafkaHook, error) { go func() { for { select { - case prodErr := <-conf.Producer.Errors(): - fmt.Fprintf(os.Stderr, "[kafkahook] producer error '%s'; reconnecting producer...\n", prodErr.Err) - - // Close any previously open producers - if conf.Producer != nil { - // Since `Producer.Return.Errors` is true, `Producer.Close()` will send a shutdown error which - // could cause `Close()` to block unless we read it. Also there could be other errors - // that are waiting to be received and must be sent before shutdown is complete. - go func() { - for range conf.Producer.Errors() {} - }() - - conf.Producer.Close() - conf.Producer = nil - } + case err := <-conf.Producer.Errors(): + msg, _ := err.Msg.Value.Encode() + fmt.Fprintf(os.Stderr, "[kafkahook] produce error '%s' for: %s\n", err.Err, string(msg)) - var backOff int - for { - // Attempt to reconnect to the brokers - conf.Producer, err = sarama.NewAsyncProducer(conf.Endpoints, kafkaConfig) - if err != nil { - if backOff < 6 { - backOff++ - } - fmt.Fprintf(os.Stderr, "[kafkahook] reconnect error: %s; sleeping (%d)...\n", err, backOff) - time.Sleep(time.Duration(backOff) * time.Second) - continue - } - backOff = 0 - break - } case buf, ok := <-h.produce: if !ok { - h.wg.Done() - // See comment above - go func() { - for range conf.Producer.Errors() {} - }() if err := conf.Producer.Close(); err != nil { fmt.Fprintf(os.Stderr, "[kafkahook] producer close error: %s\n", err) } + h.wg.Done() return } conf.Producer.Input() <- &sarama.ProducerMessage{