Skip to content

Commit

Permalink
improvement(kafka): support async client creation
Browse files Browse the repository at this point in the history
  • Loading branch information
yinheli committed Aug 27, 2021
1 parent 86d61fb commit a62e78e
Showing 1 changed file with 38 additions and 27 deletions.
65 changes: 38 additions & 27 deletions pkg/kafkax/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ type config struct {
}

type clientWrapper struct {
client sarama.Client
name string
producer sarama.SyncProducer
countVec *prometheus.CounterVec
histogram prometheus.Histogram
config config
client sarama.Client
name string
syncProducer sarama.SyncProducer
countVec *prometheus.CounterVec
histogram prometheus.Histogram
}

func Config(v *viper.Viper) {
Expand All @@ -62,19 +63,20 @@ func Config(v *viper.Viper) {

config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.MaxMessageBytes = 1024 * 1024 * 8
config.Producer.MaxMessageBytes = 1024 * 1024 * 10

config.ClientID = clientId
var (
client sarama.Client
producer sarama.SyncProducer
err error
client sarama.Client
syncProducer sarama.SyncProducer
err error
)
if client, err = sarama.NewClient(c.Broker, config); err != nil {
log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name))
}

if producer, err = sarama.NewSyncProducerFromClient(client); err != nil {
log.Fatal("init kafka producer error", zap.Error(err), zap.String("name", c.Name))
if syncProducer, err = sarama.NewSyncProducerFromClient(client); err != nil {
log.Fatal("init kafka sync producer error", zap.Error(err), zap.String("name", c.Name))
}

subsystem := fmt.Sprintf("kafka_%s", strings.ReplaceAll(c.Name, "-", "_"))
Expand All @@ -94,11 +96,12 @@ func Config(v *viper.Viper) {
})

clients[c.Name] = &clientWrapper{
name: c.Name,
client: client,
producer: producer,
countVec: countVec,
histogram: histogram,
config: c,
name: c.Name,
client: client,
syncProducer: syncProducer,
countVec: countVec,
histogram: histogram,
}
}
}
Expand All @@ -114,19 +117,17 @@ func GetClient(name ...string) sarama.Client {
func GetProducer(name ...string) sarama.SyncProducer {
wrap := getClientWrap(name...)
if wrap != nil {
return wrap.producer
return wrap.syncProducer
}
return nil
}

func getClientWrap(name ...string) *clientWrapper {
var wrap *clientWrapper
if name != nil {
wrap = clients[name[0]]
} else {
wrap = clients["default"]
func NewAsyncProducer(name ...string) (sarama.AsyncProducer, error) {
wrap := getClientWrap(name...)
if wrap != nil {
return sarama.NewAsyncProducerFromClient(wrap.client)
}
return wrap
return nil, errors.Errorf("client not found, clientName:%v", name)
}

func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientName ...string) (err error) {
Expand All @@ -149,14 +150,14 @@ func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientNam
err = producerNotFoundErr
return
}
producer := cw.producer
if producer == nil {
syncProducer := cw.syncProducer
if syncProducer == nil {
err = producerNotFoundErr
return
}
start := time.Now()
cw.countVec.WithLabelValues(message.Topic).Inc()
_, _, err = producer.SendMessage(message)
_, _, err = syncProducer.SendMessage(message)
duration := time.Since(start).Milliseconds()
cw.histogram.Observe(float64(duration))
if err != nil {
Expand Down Expand Up @@ -213,3 +214,13 @@ func StartGroupConsume(ctx context.Context, group string, topics []string, handl
}
}()
}

func getClientWrap(name ...string) *clientWrapper {
var wrap *clientWrapper
if name != nil {
wrap = clients[name[0]]
} else {
wrap = clients["default"]
}
return wrap
}

0 comments on commit a62e78e

Please sign in to comment.