Skip to content

Commit

Permalink
fix middleware & metric name
Browse files Browse the repository at this point in the history
  • Loading branch information
yinheli committed Jul 2, 2021
1 parent 8368b54 commit 1f4ad46
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 35 deletions.
10 changes: 6 additions & 4 deletions core/xservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"

Expand Down Expand Up @@ -191,10 +192,9 @@ func (t *serverImpl) waitSignalForTableflip(upg *tableflip.Upgrader) {
}

func (t *serverImpl) initEcho() {
e := t.newEcho()
e := t.newEcho("http")
echox.ConfigValidator(e)

e.Use(middleware.Trace(t.options.Config.GetBool("jaeger.body_dump"), t.options.EchoTracingSkipper))
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
e.Group("/debug/*", middleware.Pprof())

Expand Down Expand Up @@ -236,15 +236,15 @@ func (t *serverImpl) initGrpc() {
))

// echo instance for grpc-gateway, which wrap another echo instance, for gRPC service not found fallback serve
e := t.newEcho()
e := t.newEcho("grpc_gateway")
e.Use(echo.WrapMiddleware(func(handler http.Handler) http.Handler {
return t.grpcGateway
}))

t.httpHandler = e
}

func (t *serverImpl) newEcho() *echo.Echo {
func (t *serverImpl) newEcho(subsystem string) *echo.Echo {
e := echo.New()

e.Logger = log.NewEchoLogger()
Expand All @@ -266,6 +266,8 @@ func (t *serverImpl) newEcho() *echo.Echo {

e.Use(echomd.RequestID())
e.Use(sentryecho.New(sentryecho.Options{Repanic: true}))
e.Use(middleware.Trace(t.options.Config.GetBool("jaeger.body_dump"), t.options.EchoTracingSkipper))
e.Use(middleware.Prometheus(strings.ReplaceAll(t.options.Name, "-", "_"), subsystem))

// logger id & traceId & server-info
e.Use(func(next echo.HandlerFunc) echo.HandlerFunc {
Expand Down
67 changes: 39 additions & 28 deletions pkg/kafkax/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strings"
"time"

"github.com/Shopify/sarama"
Expand All @@ -21,9 +22,7 @@ import (
)

var (
clients map[string]*clientWrapper
topicCountVec *prometheus.CounterVec
sendDurationsHistogram prometheus.Histogram
clients map[string]*clientWrapper
)

type config struct {
Expand All @@ -33,9 +32,11 @@ type config struct {
}

type clientWrapper struct {
client sarama.Client
name string
producer sarama.SyncProducer
client sarama.Client
name string
producer sarama.SyncProducer
countVec *prometheus.CounterVec
histogram prometheus.Histogram
}

func Config(v *viper.Viper) {
Expand All @@ -46,21 +47,6 @@ func Config(v *viper.Viper) {

serviceName := os.Getenv(core.EnvServiceName)

topicCountVec = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: serviceName,
Subsystem: "kafka",
Name: "send_total",
Help: "Number of kafka message sent",
}, []string{"topic"})

sendDurationsHistogram = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: serviceName,
Subsystem: "kafka",
Name: "send_duration_millisecond",
Help: "Send duration",
Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000},
})

clients = make(map[string]*clientWrapper, len(configs))

hostname, _ := os.Hostname()
Expand Down Expand Up @@ -90,10 +76,28 @@ func Config(v *viper.Viper) {
log.Fatal("init kafka producer error", zap.Error(err), zap.String("name", c.Name))
}

subsystem := fmt.Sprintf("kafka_%s", strings.ReplaceAll(c.Name, "-", "_"))
countVec := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: serviceName,
Subsystem: subsystem,
Name: "send_total",
Help: "Number of kafka message sent",
}, []string{"topic"})

histogram := promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: serviceName,
Subsystem: subsystem,
Name: "send_duration_millisecond",
Help: "Send duration",
Buckets: []float64{20, 50, 100, 200, 300, 500, 1000, 2000, 3000, 5000},
})

clients[c.Name] = &clientWrapper{
name: c.Name,
client: client,
producer: producer,
name: c.Name,
client: client,
producer: producer,
countVec: countVec,
histogram: histogram,
}
}
}
Expand Down Expand Up @@ -137,16 +141,23 @@ func SendMessage(ctx context.Context, message *sarama.ProducerMessage, clientNam
}()
}

producer := GetProducer(clientName...)
producerNotFoundErr := errors.Errorf("producer not found, clientName:%v", clientName)

cw := getClientWrap(clientName...)
if cw == nil {
err = producerNotFoundErr
return
}
producer := cw.producer
if producer == nil {
err = errors.Errorf("producer not found, clientName:%v", clientName)
err = producerNotFoundErr
return
}
start := time.Now()
topicCountVec.WithLabelValues(message.Topic).Inc()
cw.countVec.WithLabelValues(message.Topic).Inc()
_, _, err = producer.SendMessage(message)
duration := time.Since(start).Milliseconds()
sendDurationsHistogram.Observe(float64(duration))
cw.histogram.Observe(float64(duration))
if err != nil {
sentry.CaptureException(errors.WithMessage(err, fmt.Sprint("kafka send message to topic:", message.Topic)))
}
Expand Down
4 changes: 1 addition & 3 deletions pkg/tracingx/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ func Config(v *viper.Viper) {
cfg.Sampler.Param = 1
}

var metricsFactory metrics.Factory
metricsFactory = prometheus.New().Namespace(metrics.NSOptions{Name: serviceName, Tags: nil})
metricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: cfg.ServiceName, Tags: nil})
metricsFactory := prometheus.New().Namespace(metrics.NSOptions{Name: "xservice", Tags: map[string]string{"service": serviceName}})

tracer, closer, err := cfg.NewTracer(
config.Logger(&jaegerLoggerAdapter{}),
Expand Down
2 changes: 2 additions & 0 deletions tools/xservice/model/mysql/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ var (
"char": "string",
"date": "time.Time",
"datetime": "time.Time",
"bit": "int8",
"bit(1)": "int8",
"tinyint": "int8",
"tinyint unsigned": "uint8",
Expand Down Expand Up @@ -82,6 +83,7 @@ var (
{`^(smallint)[(]\d+[)]`, "int16"},
{`^(int)[(]\d+[)] unsigned`, "uint"},
{`^(int)[(]\d+[)]`, "int"},
{`^(bit)[(]\d+[)]`, "int8"},
{`^(bigint)[(]\d+[)] unsigned`, "uint64"},
{`^(bigint)[(]\d+[)]`, "int64"},
{`^(char)[(]\d+[)]`, "string"},
Expand Down

0 comments on commit 1f4ad46

Please sign in to comment.