diff --git a/core/xservice/server.go b/core/xservice/server.go index 429477f..f7b5363 100644 --- a/core/xservice/server.go +++ b/core/xservice/server.go @@ -8,6 +8,7 @@ import ( "os" "os/signal" "runtime" + "strings" "syscall" "time" @@ -191,10 +192,9 @@ func (t *serverImpl) waitSignalForTableflip(upg *tableflip.Upgrader) { } func (t *serverImpl) initEcho() { - e := t.newEcho() + e := t.newEcho("http") echox.ConfigValidator(e) - e.Use(middleware.Trace(t.options.Config.GetBool("jaeger.body_dump"), t.options.EchoTracingSkipper)) e.GET("/metrics", echo.WrapHandler(promhttp.Handler())) e.Group("/debug/*", middleware.Pprof()) @@ -236,7 +236,7 @@ func (t *serverImpl) initGrpc() { )) // echo instance for grpc-gateway, which wrap another echo instance, for gRPC service not found fallback serve - e := t.newEcho() + e := t.newEcho("grpc_gateway") e.Use(echo.WrapMiddleware(func(handler http.Handler) http.Handler { return t.grpcGateway })) @@ -244,7 +244,7 @@ func (t *serverImpl) initGrpc() { t.httpHandler = e } -func (t *serverImpl) newEcho() *echo.Echo { +func (t *serverImpl) newEcho(subsystem string) *echo.Echo { e := echo.New() e.Logger = log.NewEchoLogger() @@ -266,6 +266,8 @@ func (t *serverImpl) newEcho() *echo.Echo { e.Use(echomd.RequestID()) e.Use(sentryecho.New(sentryecho.Options{Repanic: true})) + e.Use(middleware.Trace(t.options.Config.GetBool("jaeger.body_dump"), t.options.EchoTracingSkipper)) + e.Use(middleware.Prometheus(strings.ReplaceAll(t.options.Name, "-", "_"), subsystem)) // logger id & traceId & server-info e.Use(func(next echo.HandlerFunc) echo.HandlerFunc { diff --git a/pkg/kafkax/kafka.go b/pkg/kafkax/kafka.go index e1daa78..41e32a9 100644 --- a/pkg/kafkax/kafka.go +++ b/pkg/kafkax/kafka.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "strings" "time" "github.com/Shopify/sarama" @@ -21,9 +22,7 @@ import ( ) var ( - clients map[string]*clientWrapper - topicCountVec *prometheus.CounterVec - sendDurationsHistogram prometheus.Histogram + clients map[string]*clientWrapper ) type config struct { @@ -33,9 +32,11 @@ type config struct { } type clientWrapper struct { - client sarama.Client - name string - producer sarama.SyncProducer + client sarama.Client + name string + producer sarama.SyncProducer + countVec *prometheus.CounterVec + histogram prometheus.Histogram } func Config(v *viper.Viper) { @@ -46,21 +47,6 @@ func Config(v *viper.Viper) { serviceName := os.Getenv(core.EnvServiceName) - topicCountVec = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: serviceName, - Subsystem: "kafka", - Name: "send_total", - Help: "Number of kafka message sent", - }, []string{"topic"}) - - sendDurationsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: serviceName, - Subsystem: "kafka", - Name: "send_duration_millisecond", - Help: "Send duration", - Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000}, - }) - clients = make(map[string]*clientWrapper, len(configs)) hostname, _ := os.Hostname() @@ -90,10 +76,28 @@ func Config(v *viper.Viper) { log.Fatal("init kafka producer error", zap.Error(err), zap.String("name", c.Name)) } + subsystem := fmt.Sprintf("kafka_%s", strings.ReplaceAll(c.Name, "-", "_")) + countVec := promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: serviceName, + Subsystem: subsystem, + Name: "send_total", + Help: "Number of kafka message sent", + }, []string{"topic"}) + + histogram := promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: serviceName, + Subsystem: subsystem, + Name: "send_duration_millisecond", + Help: "Send duration", + Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000}, + }) + clients[c.Name] = &clientWrapper{ - name: c.Name, - client: client, - producer: producer, + name: c.Name, + client: client, + producer: producer, + countVec: countVec, + histogram: histogram, } } } @@ -137,16 +141,23 @@ func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientNam }() } - producer := GetProducer(clientName...) + producerNotFoundErr := errors.Errorf("producer not found, clientName:%v", clientName) + + cw := getClientWrap(clientName...) + if cw == nil { + err = producerNotFoundErr + return + } + producer := cw.producer if producer == nil { - err = errors.Errorf("producer not found, clientName:%v", clientName) + err = producerNotFoundErr return } start := time.Now() - topicCountVec.WithLabelValues(message.Topic).Inc() + cw.countVec.WithLabelValues(message.Topic).Inc() _, _, err = producer.SendMessage(message) duration := time.Since(start).Milliseconds() - sendDurationsHistogram.Observe(float64(duration)) + cw.histogram.Observe(float64(duration)) if err != nil { sentry.CaptureException(errors.WithMessage(err, fmt.Sprint("kafka send message to topic:", message.Topic))) } diff --git a/pkg/tracingx/config.go b/pkg/tracingx/config.go index 1a80c62..bf573f8 100644 --- a/pkg/tracingx/config.go +++ b/pkg/tracingx/config.go @@ -39,9 +39,7 @@ func Config(v *viper.Viper) { cfg.Sampler.Param = 1 } - var metricsFactory metrics.Factory - metricsFactory = prometheus.New().Namespace(metrics.NSOptions{Name: serviceName, Tags: nil}) - metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: cfg.ServiceName, Tags: nil}) + metricsFactory := prometheus.New().Namespace(metrics.NSOptions{Name: "xservice", Tags: map[string]string{"service": serviceName}}) tracer, closer, err := cfg.NewTracer( config.Logger(&jaegerLoggerAdapter{}), diff --git a/tools/xservice/model/mysql/table.go b/tools/xservice/model/mysql/table.go index 312e0eb..bf5a820 100644 --- a/tools/xservice/model/mysql/table.go +++ b/tools/xservice/model/mysql/table.go @@ -55,6 +55,7 @@ var ( "char": "string", "date": "time.Time", "datetime": "time.Time", + "bit": "int8", "bit(1)": "int8", "tinyint": "int8", "tinyint unsigned": "uint8", @@ -82,6 +83,7 @@ var ( {`^(smallint)[(]\d+[)]`, "int16"}, {`^(int)[(]\d+[)] unsigned`, "uint"}, {`^(int)[(]\d+[)]`, "int"}, + {`^(bit)[(]\d+[)]`, "int8"}, {`^(bigint)[(]\d+[)] unsigned`, "uint64"}, {`^(bigint)[(]\d+[)]`, "int64"}, {`^(char)[(]\d+[)]`, "string"},