diff --git a/pkg/kafkax/kafka.go b/pkg/kafkax/kafka.go index acd64f9..022b50b 100644 --- a/pkg/kafkax/kafka.go +++ b/pkg/kafkax/kafka.go @@ -2,187 +2,144 @@ package kafkax import ( "context" - "fmt" - "os" - "strings" + "sync" "time" "github.com/Shopify/sarama" - "github.com/getsentry/sentry-go" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/spf13/viper" "go.uber.org/zap" - "github.com/xinpianchang/xservice/core" "github.com/xinpianchang/xservice/pkg/log" ) var ( - clients map[string]*clientWrapper + configMap = make(map[string]mqConfig, 8) ) -type config struct { +type mqConfig struct { Name string `yaml:"name"` Version string `yaml:"version"` Broker []string `yaml:"broker"` } -type clientWrapper struct { - config config - client sarama.Client - name string - syncProducer sarama.SyncProducer - countVec *prometheus.CounterVec - histogram prometheus.Histogram -} - func Config(v *viper.Viper) { - var configs []config + var configs []mqConfig if err := v.UnmarshalKey("mq", &configs); err != nil { log.Fatal("unmarshal kafka config", zap.Error(err)) } - serviceName := os.Getenv(core.EnvServiceName) + for _, it := range configs { + configMap[it.Name] = it + } +} - clients = make(map[string]*clientWrapper, len(configs)) +// Client is a kafka client wrapper +type Client interface { + // Get get kafka client + Get() sarama.Client - hostname, _ := os.Hostname() - clientId := fmt.Sprint(serviceName, "_", hostname, "_", os.Getpid()) + // SendMessage send message to kafka + SendMessage(ctx context.Context, message *sarama.ProducerMessage) error - for _, c := range configs { - config := sarama.NewConfig() - if version, err := sarama.ParseKafkaVersion(c.Version); err != nil { - log.Fatal("kafka config", zap.Error(err)) - } else { - config.Version = version - } + // GroupConsume consume kafka group + GroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler) error +} - config.Consumer.Return.Errors = true - config.Producer.Return.Successes = true - config.Producer.MaxMessageBytes = 1024 * 1024 * 10 - - config.ClientID = clientId - var ( - client sarama.Client - syncProducer sarama.SyncProducer - err error - ) - if client, err = sarama.NewClient(c.Broker, config); err != nil { - log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name)) - } +type defaultKafka struct { + config mqConfig + client sarama.Client + producerInitializeOnce sync.Once + producer sarama.SyncProducer +} - if syncProducer, err = sarama.NewSyncProducerFromClient(client); err != nil { - log.Fatal("init kafka sync producer error", zap.Error(err), zap.String("name", c.Name)) - } +// New create a kafka client +func New(name string, cfg ...*sarama.Config) (Client, error) { + c, ok := configMap[name] + if !ok { + return nil, errors.Errorf("configuration not found, name: %v", name) + } - subsystem := fmt.Sprintf("kafka_%s", strings.ReplaceAll(c.Name, "-", "_")) - countVec := promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: serviceName, - Subsystem: subsystem, - Name: "send_total", - Help: "Number of kafka message sent", - }, []string{"topic"}) - - histogram := promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: serviceName, - Subsystem: subsystem, - Name: "send_duration_millisecond", - Help: "Send duration", - Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000}, - }) - - clients[c.Name] = &clientWrapper{ - config: c, - name: c.Name, - client: client, - syncProducer: syncProducer, - countVec: countVec, - histogram: histogram, - } + kafka := &defaultKafka{ + config: c, } -} -func GetClient(name ...string) sarama.Client { - wrap := getClientWrap(name...) - if wrap != nil { - return wrap.client + var config *sarama.Config + if len(cfg) > 0 { + config = cfg[0] + } + if config == nil { + config = sarama.NewConfig() } - return nil -} -func GetProducer(name ...string) sarama.SyncProducer { - wrap := getClientWrap(name...) - if wrap != nil { - return wrap.syncProducer + if version, err := sarama.ParseKafkaVersion(c.Version); err != nil { + log.Fatal("kafka config", zap.Error(err)) + } else { + config.Version = version } - return nil -} -func NewAsyncProducer(name ...string) (sarama.AsyncProducer, error) { - wrap := getClientWrap(name...) - if wrap != nil { - return sarama.NewAsyncProducerFromClient(wrap.client) + config.Consumer.Return.Errors = true + config.Producer.Return.Successes = true + + maxMessageBytes := 1024 * 1024 * 10 + if config.Producer.MaxMessageBytes < maxMessageBytes { + config.Producer.MaxMessageBytes = maxMessageBytes } - return nil, errors.Errorf("client not found, clientName:%v", name) + + if client, err := sarama.NewClient(c.Broker, config); err != nil { + log.Fatal("init kafka client error", zap.Error(err), zap.String("name", c.Name)) + } else { + kafka.client = client + } + + return kafka, nil +} + +// Get get kafka client +func (t *defaultKafka) Get() sarama.Client { + return t.client } -func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientName ...string) (err error) { +// SendMessage send message to kafka +func (t *defaultKafka) SendMessage(ctx context.Context, message *sarama.ProducerMessage) (err error) { + if err := t.initializeProducer(ctx); err != nil { + return err + } + if span := opentracing.SpanFromContext(ctx); span != nil { ctx = opentracing.ContextWithSpan(context.Background(), span) span, _ = opentracing.StartSpanFromContext(ctx, "kafka_send") defer func() { if err != nil { ext.Error.Set(span, true) - span.LogKV("err", err, "clientName", clientName) + span.LogKV("client_name", t.config.Name, "err", err) } span.Finish() }() } - producerNotFoundErr := errors.Errorf("producer not found, clientName:%v", clientName) + _, _, err = t.producer.SendMessage(message) - cw := getClientWrap(clientName...) - if cw == nil { - err = producerNotFoundErr - return - } - syncProducer := cw.syncProducer - if syncProducer == nil { - err = producerNotFoundErr - return - } - start := time.Now() - cw.countVec.WithLabelValues(message.Topic).Inc() - _, _, err = syncProducer.SendMessage(message) - duration := time.Since(start).Milliseconds() - cw.histogram.Observe(float64(duration)) - if err != nil { - sentry.CaptureException(errors.WithMessage(err, fmt.Sprint("kafka send message to topic:", message.Topic))) - } return } -func StartGroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler, name ...string) { - l := log.Named("kafka consumer").With(zap.String("groupID", group), zap.Strings("topics", topics)) +// GroupConsume consume kafka group +func (t *defaultKafka) GroupConsume(ctx context.Context, group string, topics []string, handler sarama.ConsumerGroupHandler) error { + l := log.Named("kafka consumer").With( + zap.String("name", t.config.Name), + zap.String("groupId", group), + zap.Strings("topics", topics), + ) if group == "" || len(topics) == 0 { - return + return nil } - client := GetClient(name...) - if client == nil { - log.Fatal("kafkaClient not found", zap.Any("name", name)) - return - } - - consumer, err := sarama.NewConsumerGroupFromClient(group, client) + consumer, err := sarama.NewConsumerGroupFromClient(group, t.client) if err != nil { - l.Fatal("init consume group error", zap.Error(err)) - return + return err } go func() { @@ -205,7 +162,7 @@ func StartGroupConsume(ctx context.Context, group string, topics []string, handl default: // pass } - l.Debug("start consume client ...") + l.Debug("start consume") err = consumer.Consume(ctx, topics, handler) if err != nil { l.Warn("consume", zap.Error(err)) @@ -213,14 +170,13 @@ func StartGroupConsume(ctx context.Context, group string, topics []string, handl } } }() + + return nil } -func getClientWrap(name ...string) *clientWrapper { - var wrap *clientWrapper - if name != nil { - wrap = clients[name[0]] - } else { - wrap = clients["default"] - } - return wrap +func (t *defaultKafka) initializeProducer(ctx context.Context) (err error) { + t.producerInitializeOnce.Do(func() { + t.producer, err = sarama.NewSyncProducerFromClient(t.client) + }) + return }