From 2084f86615e2776793e699d321dad62aa1fa3f10 Mon Sep 17 00:00:00 2001 From: sabevzenko Date: Thu, 11 Jul 2024 17:47:13 +0300 Subject: [PATCH] new monitoring metric - engine_LastMaxActiveRequests 58de492a9e1d14a374ca14ef471701c1fdbc8cac --- .changes/v0.5.30.md | 3 ++ .mapping.json | 2 ++ CHANGELOG.md | 4 +++ cli/cli.go | 4 +-- cli/expvar.go | 14 +++----- components/guns/http/base_test.go | 12 +------ core/engine/engine.go | 11 ++++++ core/engine/engine_test.go | 46 ++++++++++-------------- core/engine/instance.go | 2 ++ core/engine/instance_test.go | 16 ++++----- lib/monitoring/instance.go | 48 ++++++++++++++++++++++++++ tests/acceptance/common.go | 11 ------ tests/acceptance/connect_test.go | 2 +- tests/acceptance/grpc_test.go | 4 +-- tests/acceptance/http_scenario_test.go | 2 +- tests/acceptance/http_test.go | 2 +- 16 files changed, 108 insertions(+), 75 deletions(-) create mode 100644 .changes/v0.5.30.md create mode 100644 lib/monitoring/instance.go diff --git a/.changes/v0.5.30.md b/.changes/v0.5.30.md new file mode 100644 index 000000000..ee4821736 --- /dev/null +++ b/.changes/v0.5.30.md @@ -0,0 +1,3 @@ +## v0.5.30 - 2024-07-10 +### Added +* new monitoring metric - engine_LastMaxActiveRequests diff --git a/.mapping.json b/.mapping.json index 22bdea19b..75764fe3a 100644 --- a/.mapping.json +++ b/.mapping.json @@ -27,6 +27,7 @@ ".changes/v0.5.27.md":"load/projects/pandora/.changes/v0.5.27.md", ".changes/v0.5.28.md":"load/projects/pandora/.changes/v0.5.28.md", ".changes/v0.5.29.md":"load/projects/pandora/.changes/v0.5.29.md", + ".changes/v0.5.30.md":"load/projects/pandora/.changes/v0.5.30.md", ".changie.yaml":"load/projects/pandora/.changie.yaml", ".github/actions/setup-yc/action.yml":"load/projects/pandora/.github/actions/setup-yc/action.yml", ".github/workflows/pages.yml":"load/projects/pandora/.github/workflows/pages.yml", @@ -466,6 +467,7 @@ "lib/math/gcd_lcm.go":"load/projects/pandora/lib/math/gcd_lcm.go", "lib/math/gcd_lcm_test.go":"load/projects/pandora/lib/math/gcd_lcm_test.go", "lib/monitoring/counter.go":"load/projects/pandora/lib/monitoring/counter.go", + "lib/monitoring/instance.go":"load/projects/pandora/lib/monitoring/instance.go", "lib/mp/iterator.go":"load/projects/pandora/lib/mp/iterator.go", "lib/mp/map.go":"load/projects/pandora/lib/mp/map.go", "lib/mp/map_test.go":"load/projects/pandora/lib/mp/map_test.go", diff --git a/CHANGELOG.md b/CHANGELOG.md index 15fdc93d0..fbbc1eb8c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,10 @@ adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html), and is generated by [Changie](https://github.com/miniscruff/changie). +## v0.5.30 - 2024-07-10 +### Added +* new monitoring metric - engine_LastMaxActiveRequests + ## v0.5.29 - 2024-06-25 ### Added * HTTP scenario var/header postprocessor use multiple pipes diff --git a/cli/cli.go b/cli/cli.go index 9a0e82813..0e0082e7f 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -25,7 +25,7 @@ import ( "go.uber.org/zap/zapcore" ) -const Version = "0.5.29" +const Version = "0.5.30" const defaultConfigFile = "load" const stdinConfigSelector = "-" @@ -122,7 +122,7 @@ func ReadConfigAndRunEngine() { closeMonitoring := startMonitoring(conf.Monitoring) defer closeMonitoring() - m := newEngineMetrics() + m := engine.NewMetrics("engine") startReport(m) pandora := engine.New(log, m, conf.Engine) diff --git a/cli/expvar.go b/cli/expvar.go index 054ffe9f1..605bcc7e4 100644 --- a/cli/expvar.go +++ b/cli/expvar.go @@ -8,20 +8,12 @@ import ( "go.uber.org/zap" ) -func newEngineMetrics() engine.Metrics { - return engine.Metrics{ - Request: monitoring.NewCounter("engine_Requests"), - Response: monitoring.NewCounter("engine_Responses"), - InstanceStart: monitoring.NewCounter("engine_UsersStarted"), - InstanceFinish: monitoring.NewCounter("engine_UsersFinished"), - } -} - func startReport(m engine.Metrics) { evReqPS := monitoring.NewCounter("engine_ReqPS") evResPS := monitoring.NewCounter("engine_ResPS") evActiveUsers := monitoring.NewCounter("engine_ActiveUsers") evActiveRequests := monitoring.NewCounter("engine_ActiveRequests") + evLastMaxActiveRequests := monitoring.NewCounter("engine_LastMaxActiveRequests") requests := m.Request.Get() responses := m.Response.Get() go func() { @@ -36,15 +28,17 @@ func startReport(m engine.Metrics) { reqps := requestsNew - requests activeUsers := m.InstanceStart.Get() - m.InstanceFinish.Get() activeRequests := requestsNew - responsesNew + lastMaxActiveRequests := int64(m.BusyInstances.Flush()) zap.S().Infof( "[ENGINE] %d resp/s; %d req/s; %d users; %d active\n", - rps, reqps, activeUsers, activeRequests) + rps, reqps, activeUsers, lastMaxActiveRequests) requests = requestsNew responses = responsesNew evActiveUsers.Set(activeUsers) evActiveRequests.Set(activeRequests) + evLastMaxActiveRequests.Set(lastMaxActiveRequests) evReqPS.Set(reqps) evResPS.Set(rps) } diff --git a/components/guns/http/base_test.go b/components/guns/http/base_test.go index 1ece3fca9..b11d5e9d3 100644 --- a/components/guns/http/base_test.go +++ b/components/guns/http/base_test.go @@ -19,7 +19,6 @@ import ( "github.com/yandex/pandora/core/aggregator/netsample" "github.com/yandex/pandora/core/coretest" "github.com/yandex/pandora/core/engine" - "github.com/yandex/pandora/lib/monitoring" "github.com/yandex/pandora/lib/testutil" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -35,15 +34,6 @@ func newLogger() *zap.Logger { return log } -func newEngineMetrics(prefix string) engine.Metrics { - return engine.Metrics{ - Request: monitoring.NewCounter(prefix + "_Requests"), - Response: monitoring.NewCounter(prefix + "_Responses"), - InstanceStart: monitoring.NewCounter(prefix + "_UsersStarted"), - InstanceFinish: monitoring.NewCounter(prefix + "_UsersFinished"), - } -} - func TestGunSuite(t *testing.T) { suite.Run(t, new(BaseGunSuite)) } @@ -59,7 +49,7 @@ type BaseGunSuite struct { func (s *BaseGunSuite) SetupSuite() { s.log = testutil.NewLogger() - s.metrics = newEngineMetrics("http_suite") + s.metrics = engine.NewMetrics("http_suite") } func (s *BaseGunSuite) SetupTest() { diff --git a/core/engine/engine.go b/core/engine/engine.go index 6d5476d86..bfb4b67f7 100644 --- a/core/engine/engine.go +++ b/core/engine/engine.go @@ -29,6 +29,16 @@ type InstancePoolConfig struct { DiscardOverflow bool `config:"discard_overflow"` } +func NewMetrics(prefix string) Metrics { + return Metrics{ + Request: monitoring.NewCounter(prefix + "_Requests"), + Response: monitoring.NewCounter(prefix + "_Responses"), + InstanceStart: monitoring.NewCounter(prefix + "_UsersStarted"), + InstanceFinish: monitoring.NewCounter(prefix + "_UsersFinished"), + BusyInstances: monitoring.NewInstanceTracker(prefix + "_BusyInstances"), + } +} + // TODO(skipor): use something github.com/rcrowley/go-metrics based. // Its high level primitives like Meter can be not fast enough, but EWMAs // and Counters should good for that. @@ -37,6 +47,7 @@ type Metrics struct { Response *monitoring.Counter InstanceStart *monitoring.Counter InstanceFinish *monitoring.Counter + BusyInstances *monitoring.InstanceTracker } func New(log *zap.Logger, m Metrics, conf Config) *Engine { diff --git a/core/engine/engine_test.go b/core/engine/engine_test.go index 76deea9b9..aaa983f32 100644 --- a/core/engine/engine_test.go +++ b/core/engine/engine_test.go @@ -17,7 +17,6 @@ import ( coremock "github.com/yandex/pandora/core/mocks" "github.com/yandex/pandora/core/provider" "github.com/yandex/pandora/core/schedule" - "github.com/yandex/pandora/lib/monitoring" "go.uber.org/atomic" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -83,15 +82,15 @@ func Test_InstancePool(t *testing.T) { waitDoneCalled.Store(false) ctx, cancel = context.WithCancel(context.Background()) } - var justBeforeEach = func() { - metrics := newTestMetrics() + var justBeforeEach = func(metricPrefix string) { + metrics := NewMetrics(metricPrefix) p = newPool(newNopLogger(), metrics, onWaitDone, conf) } _ = cancel t.Run("shoot ok", func(t *testing.T) { beforeEach() - justBeforeEach() + justBeforeEach("shoot-ok") err := p.Run(ctx) require.NoError(t, err) @@ -121,7 +120,7 @@ func Test_InstancePool(t *testing.T) { beforeEach() beforeEachContext() - justBeforeEach() + justBeforeEach("context-canceled") err := p.Run(ctx) require.Equal(t, context.Canceled, err) @@ -170,7 +169,7 @@ func Test_InstancePool(t *testing.T) { }) conf.Aggregator = aggr - justBeforeEach() + justBeforeEach("provider-failed") err := p.Run(ctx) require.Error(t, err) @@ -201,7 +200,7 @@ func Test_InstancePool(t *testing.T) { aggr := &coremock.Aggregator{} aggr.On("Run", mock.Anything, mock.Anything).Return(failErr) conf.Aggregator = aggr - justBeforeEach() + justBeforeEach("aggregator-failed") err := p.Run(ctx) require.Error(t, err) @@ -227,7 +226,7 @@ func Test_InstancePool(t *testing.T) { conf.NewGun = func() (core.Gun, error) { return nil, failErr } - justBeforeEach() + justBeforeEach("start-instances-failed") err := p.Run(ctx) require.Error(t, err) @@ -259,7 +258,7 @@ func Test_MultipleInstance(t *testing.T) { schedule.NewOnce(2), schedule.NewConst(1, 5*time.Second), ) - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("test_engine_1"), nil, conf) ctx := context.Background() err := pool.Run(ctx) @@ -274,7 +273,7 @@ func Test_MultipleInstance(t *testing.T) { return schedule.NewOnce(1), nil } conf.StartupSchedule = schedule.NewOnce(3) - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("test_engine_2"), nil, conf) ctx := context.Background() err := pool.Run(ctx) @@ -291,7 +290,7 @@ func Test_MultipleInstance(t *testing.T) { schedule.NewOnce(2), schedule.NewConst(1, 2*time.Second), ) - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("test_engine_3"), nil, conf) ctx := context.Background() err := pool.Run(ctx) @@ -319,14 +318,14 @@ func Test_Engine(t *testing.T) { ctx, cancel = context.WithCancel(context.Background()) } - var justBeforeEach = func() { - metrics := newTestMetrics() + var justBeforeEach = func(metricPrefix string) { + metrics := NewMetrics(metricPrefix) engine = New(newNopLogger(), metrics, Config{confs}) } t.Run("shoot ok", func(t *testing.T) { beforeEach() - justBeforeEach() + justBeforeEach("shoot-ok-2") err := engine.Run(ctx) require.NoError(t, err) @@ -361,7 +360,7 @@ func Test_Engine(t *testing.T) { } beforeEach() beforeEachCtx() - justBeforeEach() + justBeforeEach("context-canceled-2") err := engine.Run(ctx) require.Equal(t, err, context.Canceled) @@ -398,7 +397,7 @@ func Test_Engine(t *testing.T) { aggr.On("Run", mock.Anything, mock.Anything).Return(failErr) confs[0].Aggregator = aggr - justBeforeEach() + justBeforeEach("one-pool-failed") err := engine.Run(ctx) require.Error(t, err) @@ -411,7 +410,7 @@ func Test_BuildInstanceSchedule(t *testing.T) { t.Run("per instance schedule", func(t *testing.T) { conf, _ := newTestPoolConf() conf.RPSPerInstance = true - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("per-instance-schedule"), nil, conf) newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), func() { panic("should not be called") }) @@ -428,7 +427,7 @@ func Test_BuildInstanceSchedule(t *testing.T) { conf.NewRPSSchedule = func() (core.Schedule, error) { return nil, scheduleCreateErr } - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("shared-schedule-create-failed"), nil, conf) newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), func() { panic("should not be called") }) @@ -446,7 +445,7 @@ func Test_BuildInstanceSchedule(t *testing.T) { newScheduleCalled = true return schedule.NewOnce(1), nil } - pool := newPool(newNopLogger(), newTestMetrics(), nil, conf) + pool := newPool(newNopLogger(), NewMetrics("shared-schedule-work"), nil, conf) ctx, cancel := context.WithCancel(context.Background()) newInstanceSchedule, err := pool.buildNewInstanceSchedule(context.Background(), cancel) require.NoError(t, err) @@ -532,12 +531,3 @@ func newNopLogger() *zap.Logger { log := zap.New(core) return log } - -func newTestMetrics() Metrics { - return Metrics{ - &monitoring.Counter{}, - &monitoring.Counter{}, - &monitoring.Counter{}, - &monitoring.Counter{}, - } -} diff --git a/core/engine/instance.go b/core/engine/instance.go index 965f64512..44956a148 100644 --- a/core/engine/instance.go +++ b/core/engine/instance.go @@ -88,6 +88,8 @@ func (i *instance) Run(ctx context.Context) (recoverErr error) { } if !i.discardOverflow || !waiter.IsSlowDown(ctx) { i.metrics.Request.Add(1) + i.metrics.BusyInstances.OnStart(i.id) + defer i.metrics.BusyInstances.OnFinish(i.id) if tag.Debug { i.log.Debug("Shooting", zap.Any("ammo", ammo)) } diff --git a/core/engine/instance_test.go b/core/engine/instance_test.go index fa02cadd0..b9158457f 100644 --- a/core/engine/instance_test.go +++ b/core/engine/instance_test.go @@ -32,7 +32,7 @@ func Test_Instance(t *testing.T) { newGun func() (core.Gun, error) ) - var beforeEach = func() { + var beforeEach = func(metricPrefix string) { provider = &coremock.Provider{} aggregator = &coremock.Aggregator{} gun = &coremock.Gun{} @@ -40,7 +40,7 @@ func Test_Instance(t *testing.T) { sched = &coremock.Schedule{} newScheduleErr = nil ctx = context.Background() - metrics = newTestMetrics() + metrics = NewMetrics(metricPrefix) newSchedule = func() (core.Schedule, error) { return sched, newScheduleErr } newGun = func() (core.Gun, error) { return gun, newGunErr } } @@ -85,7 +85,7 @@ func Test_Instance(t *testing.T) { require.NoError(t, insCreateErr) } t.Run("start ok", func(t *testing.T) { - beforeEach() + beforeEach("start-ok") beforeEachCtx() justBeforeEachCtx() justBeforeEach() @@ -99,7 +99,7 @@ func Test_Instance(t *testing.T) { }) t.Run("gun implements io.Closer / close called on instance close", func(t *testing.T) { - beforeEach() + beforeEach("gun-implements-io") beforeEachCtx() closeGun := mockGunCloser{gun} closeGun.On("Close").Return(nil) @@ -122,7 +122,7 @@ func Test_Instance(t *testing.T) { }) t.Run("context canceled after run / start fail", func(t *testing.T) { - beforeEach() + beforeEach("context-canceled-after-run") var cancel context.CancelFunc ctx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond) @@ -146,7 +146,7 @@ func Test_Instance(t *testing.T) { }) t.Run("context canceled before run / nothing acquired and schedule not started", func(t *testing.T) { - beforeEach() + beforeEach("context-canceled-before-run") var cancel context.CancelFunc ctx, cancel = context.WithCancel(ctx) cancel() @@ -162,7 +162,7 @@ func Test_Instance(t *testing.T) { }) t.Run("schedule create failed / instance create failed", func(t *testing.T) { - beforeEach() + beforeEach("schedule-create-failed") sched = nil newScheduleErr = errors.New("test err") justBeforeEach() @@ -173,7 +173,7 @@ func Test_Instance(t *testing.T) { }) t.Run("gun create failed / instance create failed", func(t *testing.T) { - beforeEach() + beforeEach("gun-create-failed") gun = nil newGunErr = errors.New("test err") justBeforeEach() diff --git a/lib/monitoring/instance.go b/lib/monitoring/instance.go new file mode 100644 index 000000000..bd4346e3c --- /dev/null +++ b/lib/monitoring/instance.go @@ -0,0 +1,48 @@ +package monitoring + +import ( + "expvar" + "strconv" + "sync" +) + +const defaultInstCapacity = 10000 + +func NewInstanceTracker(name string) *InstanceTracker { + v := &InstanceTracker{ids: make(map[int]struct{}, defaultInstCapacity)} + expvar.Publish(name, v) + return v +} + +type InstanceTracker struct { + mu sync.Mutex + ids map[int]struct{} + max int +} + +func (u *InstanceTracker) String() string { + u.mu.Lock() + defer u.mu.Unlock() + return strconv.Itoa(len(u.ids)) +} + +func (u *InstanceTracker) OnStart(id int) { + u.mu.Lock() + defer u.mu.Unlock() + u.ids[id] = struct{}{} + u.max = max(u.max, len(u.ids)) +} + +func (u *InstanceTracker) OnFinish(id int) { + u.mu.Lock() + defer u.mu.Unlock() + delete(u.ids, id) +} + +func (u *InstanceTracker) Flush() int { + u.mu.Lock() + defer u.mu.Unlock() + res := u.max + u.max = len(u.ids) + return res +} diff --git a/tests/acceptance/common.go b/tests/acceptance/common.go index ca774bf49..018e928b1 100644 --- a/tests/acceptance/common.go +++ b/tests/acceptance/common.go @@ -12,8 +12,6 @@ import ( "github.com/yandex/pandora/cli" "github.com/yandex/pandora/core" "github.com/yandex/pandora/core/config" - "github.com/yandex/pandora/core/engine" - "github.com/yandex/pandora/lib/monitoring" "gopkg.in/yaml.v2" ) @@ -47,15 +45,6 @@ func unmarshalConfigFile(t *testing.T, filename string, serverAddr string) map[s return mapCfg } -func newEngineMetrics(prefix string) engine.Metrics { - return engine.Metrics{ - Request: monitoring.NewCounter(prefix + "_Requests"), - Response: monitoring.NewCounter(prefix + "_Responses"), - InstanceStart: monitoring.NewCounter(prefix + "_UsersStarted"), - InstanceFinish: monitoring.NewCounter(prefix + "_UsersFinished"), - } -} - type aggregator struct { mx sync.Mutex samples []core.Sample diff --git a/tests/acceptance/connect_test.go b/tests/acceptance/connect_test.go index 56c285dde..cb6e38faf 100644 --- a/tests/acceptance/connect_test.go +++ b/tests/acceptance/connect_test.go @@ -37,7 +37,7 @@ func (s *ConnectGunSuite) SetupSuite() { }) s.log = testutil.NewNullLogger() - s.metrics = newEngineMetrics("connect_suite") + s.metrics = engine.NewMetrics("connect_suite") } func (s *ConnectGunSuite) Test_Connect() { diff --git a/tests/acceptance/grpc_test.go b/tests/acceptance/grpc_test.go index cefe5e33b..0fdf8398e 100644 --- a/tests/acceptance/grpc_test.go +++ b/tests/acceptance/grpc_test.go @@ -36,7 +36,7 @@ func TestCheckGRPCReflectServer(t *testing.T) { grpcimport.Import(fs) }) pandoraLogger := testutil.NewNullLogger() - pandoraMetrics := newEngineMetrics("reflect") + pandoraMetrics := engine.NewMetrics("reflect") baseFile, err := os.ReadFile("testdata/grpc/base.yaml") require.NoError(t, err) @@ -222,7 +222,7 @@ func (s *GrpcGunSuite) SetupSuite() { }) s.log = testutil.NewNullLogger() - s.metrics = newEngineMetrics("grpc_suite") + s.metrics = engine.NewMetrics("grpc_suite") } func (s *GrpcGunSuite) Test_Run() { diff --git a/tests/acceptance/http_scenario_test.go b/tests/acceptance/http_scenario_test.go index 154a4525a..0681114a6 100644 --- a/tests/acceptance/http_scenario_test.go +++ b/tests/acceptance/http_scenario_test.go @@ -40,7 +40,7 @@ func (s *HTTPScenarioSuite) SetupSuite() { }) s.log = testutil.NewNullLogger() - s.metrics = newEngineMetrics("http_scenario_suite") + s.metrics = engine.NewMetrics("http_scenario_suite") logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) port := os.Getenv("PORT") // TODO: how to set free port in CI? diff --git a/tests/acceptance/http_test.go b/tests/acceptance/http_test.go index fc2fe8fac..4ae65c750 100644 --- a/tests/acceptance/http_test.go +++ b/tests/acceptance/http_test.go @@ -41,7 +41,7 @@ func (s *PandoraSuite) SetupSuite() { }) s.log = testutil.NewNullLogger() - s.metrics = newEngineMetrics("http_suite") + s.metrics = engine.NewMetrics("http_suite") } func (s *PandoraSuite) Test_Http_Check_Passes() {