Skip to content

Commit

Permalink
refactor(kafkax): more efficient to use sarama client
Browse files Browse the repository at this point in the history
  • Loading branch information
yinheli committed Aug 30, 2021
1 parent a62e78e commit 55a994d
Showing 1 changed file with 86 additions and 130 deletions.
216 changes: 86 additions & 130 deletions pkg/kafkax/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,187 +2,144 @@ package kafkax

import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
"github.com/getsentry/sentry-go"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/xinpianchang/xservice/core"
"github.com/xinpianchang/xservice/pkg/log"
)

var (
clients map[string]*clientWrapper
configMap = make(map[string]mqConfig, 8)
)

type config struct {
type mqConfig struct {
Name string `yaml:"name"`
Version string `yaml:"version"`
Broker []string `yaml:"broker"`
}

type clientWrapper struct {
config config
client sarama.Client
name string
syncProducer sarama.SyncProducer
countVec *prometheus.CounterVec
histogram prometheus.Histogram
}

func Config(v *viper.Viper) {
var configs []config
var configs []mqConfig
if err := v.UnmarshalKey("mq", &configs); err != nil {
log.Fatal("unmarshal kafka config", zap.Error(err))
}

serviceName := os.Getenv(core.EnvServiceName)
for _, it := range configs {
configMap[it.Name] = it
}
}

clients = make(map[string]*clientWrapper, len(configs))
// Client is a kafka client wrapper
type Client interface {
// Get get kafka client
Get() sarama.Client

hostname, _ := os.Hostname()
clientId := fmt.Sprint(serviceName, "_", hostname, "_", os.Getpid())
// SendMessage send message to kafka
SendMessage(ctx context.Context, message *sarama.ProducerMessage) error

for _, c := range configs {
config := sarama.NewConfig()
if version, err := sarama.ParseKafkaVersion(c.Version); err != nil {
log.Fatal("kafka config", zap.Error(err))
} else {
config.Version = version
}
// GroupConsume consume kafka group
GroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler) error
}

config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true
config.Producer.MaxMessageBytes = 1024 * 1024 * 10

config.ClientID = clientId
var (
client sarama.Client
syncProducer sarama.SyncProducer
err error
)
if client, err = sarama.NewClient(c.Broker, config); err != nil {
log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name))
}
type defaultKafka struct {
config mqConfig
client sarama.Client
producerInitializeOnce sync.Once
producer sarama.SyncProducer
}

if syncProducer, err = sarama.NewSyncProducerFromClient(client); err != nil {
log.Fatal("init kafka sync producer error", zap.Error(err), zap.String("name", c.Name))
}
// New create a kafka client
func New(name string, cfg ...*sarama.Config) (Client, error) {
c, ok := configMap[name]
if !ok {
return nil, errors.Errorf("configuration not found, name: %v", name)
}

subsystem := fmt.Sprintf("kafka_%s", strings.ReplaceAll(c.Name, "-", "_"))
countVec := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: serviceName,
Subsystem: subsystem,
Name: "send_total",
Help: "Number of kafka message sent",
}, []string{"topic"})

histogram := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: serviceName,
Subsystem: subsystem,
Name: "send_duration_millisecond",
Help: "Send duration",
Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000},
})

clients[c.Name] = &clientWrapper{
config: c,
name: c.Name,
client: client,
syncProducer: syncProducer,
countVec: countVec,
histogram: histogram,
}
kafka := &defaultKafka{
config: c,
}
}

func GetClient(name ...string) sarama.Client {
wrap := getClientWrap(name...)
if wrap != nil {
return wrap.client
var config *sarama.Config
if len(cfg) > 0 {
config = cfg[0]
}
if config == nil {
config = sarama.NewConfig()
}
return nil
}

func GetProducer(name ...string) sarama.SyncProducer {
wrap := getClientWrap(name...)
if wrap != nil {
return wrap.syncProducer
if version, err := sarama.ParseKafkaVersion(c.Version); err != nil {
log.Fatal("kafka config", zap.Error(err))
} else {
config.Version = version
}
return nil
}

func NewAsyncProducer(name ...string) (sarama.AsyncProducer, error) {
wrap := getClientWrap(name...)
if wrap != nil {
return sarama.NewAsyncProducerFromClient(wrap.client)
config.Consumer.Return.Errors = true
config.Producer.Return.Successes = true

maxMessageBytes := 1024 * 1024 * 10
if config.Producer.MaxMessageBytes < maxMessageBytes {
config.Producer.MaxMessageBytes = maxMessageBytes
}
return nil, errors.Errorf("client not found, clientName:%v", name)

if client, err := sarama.NewClient(c.Broker, config); err != nil {
log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name))
} else {
kafka.client = client
}

return kafka, nil
}

// Get get kafka client
func (t *defaultKafka) Get() sarama.Client {
return t.client
}

func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientName ...string) (err error) {
// SendMessage send message to kafka
func (t *defaultKafka) SendMessage(ctx context.Context, message *sarama.ProducerMessage) (err error) {
if err := t.initializeProducer(ctx); err != nil {
return err
}

if span := opentracing.SpanFromContext(ctx); span != nil {
ctx = opentracing.ContextWithSpan(context.Background(), span)
span, _ = opentracing.StartSpanFromContext(ctx, "kafka_send")
defer func() {
if err != nil {
ext.Error.Set(span, true)
span.LogKV("err", err, "clientName", clientName)
span.LogKV("client_name", t.config.Name, "err", err)
}
span.Finish()
}()
}

producerNotFoundErr := errors.Errorf("producer not found, clientName:%v", clientName)
_, _, err = t.producer.SendMessage(message)

cw := getClientWrap(clientName...)
if cw == nil {
err = producerNotFoundErr
return
}
syncProducer := cw.syncProducer
if syncProducer == nil {
err = producerNotFoundErr
return
}
start := time.Now()
cw.countVec.WithLabelValues(message.Topic).Inc()
_, _, err = syncProducer.SendMessage(message)
duration := time.Since(start).Milliseconds()
cw.histogram.Observe(float64(duration))
if err != nil {
sentry.CaptureException(errors.WithMessage(err, fmt.Sprint("kafka send message to topic:", message.Topic)))
}
return
}

func StartGroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler, name ...string) {
l := log.Named("kafka consumer").With(zap.String("groupID", group), zap.Strings("topics", topics))
// GroupConsume consume kafka group
func (t *defaultKafka) GroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler) error {
l := log.Named("kafka consumer").With(
zap.String("name", t.config.Name),
zap.String("groupId", group),
zap.Strings("topics", topics),
)

if group == "" || len(topics) == 0 {
return
return nil
}

client := GetClient(name...)
if client == nil {
log.Fatal("kafkaClient not found", zap.Any("name", name))
return
}

consumer, err := sarama.NewConsumerGroupFromClient(group, client)
consumer, err := sarama.NewConsumerGroupFromClient(group, t.client)
if err != nil {
l.Fatal("init consume group error", zap.Error(err))
return
return err
}

go func() {
Expand All @@ -205,22 +162,21 @@ func StartGroupConsume(ctx context.Context, group string, topics []string, handl
default:
// pass
}
l.Debug("start consume client ...")
l.Debug("start consume")
err = consumer.Consume(ctx, topics, handler)
if err != nil {
l.Warn("consume", zap.Error(err))
time.Sleep(time.Second * 5)
}
}
}()

return nil
}

func getClientWrap(name ...string) *clientWrapper {
var wrap *clientWrapper
if name != nil {
wrap = clients[name[0]]
} else {
wrap = clients["default"]
}
return wrap
func (t *defaultKafka) initializeProducer(ctx context.Context) (err error) {
t.producerInitializeOnce.Do(func() {
t.producer, err = sarama.NewSyncProducerFromClient(t.client)
})
return
}

0 comments on commit 55a994d

Please sign in to comment.