diff --git a/cloud/tasks/collect_lister_metrics_task.go b/cloud/tasks/collect_lister_metrics_task.go index 233a7d7da9e..efb5da4094b 100644 --- a/cloud/tasks/collect_lister_metrics_task.go +++ b/cloud/tasks/collect_lister_metrics_task.go @@ -40,69 +40,35 @@ func (c *collectListerMetricsTask) Run( execCtx ExecutionContext, ) error { + taskStatuses := []string{ + storage.TaskStatusToString(storage.TaskStatusReadyToRun), + storage.TaskStatusToString(storage.TaskStatusRunning), + storage.TaskStatusToString(storage.TaskStatusReadyToCancel), + storage.TaskStatusToString(storage.TaskStatusCancelling), + } + defer c.cleanupMetrics(taskStatuses) + ticker := time.NewTicker(c.metricsCollectionInterval) defer ticker.Stop() for range ticker.C { - err := c.collectTasksMetrics( - ctx, - func(context.Context) ([]storage.TaskInfo, error) { - return c.storage.ListTasksReadyToRun( - ctx, - ^uint64(0), // limit - nil, - ) - }, - storage.TaskStatusToString(storage.TaskStatusReadyToRun), - ) - if err != nil { - return err - } - - err = c.collectTasksMetrics( - ctx, - func(context.Context) ([]storage.TaskInfo, error) { - return c.storage.ListTasksRunning( - ctx, - ^uint64(0), // limit - ) - }, - storage.TaskStatusToString(storage.TaskStatusRunning), - ) - if err != nil { - return err - } - - err = c.collectTasksMetrics( - ctx, - func(context.Context) ([]storage.TaskInfo, error) { - return c.storage.ListTasksReadyToCancel( - ctx, - ^uint64(0), // limit - nil, - ) - }, - storage.TaskStatusToString(storage.TaskStatusReadyToCancel), - ) - if err != nil { - return err - } - - err = c.collectTasksMetrics( - ctx, - func(context.Context) ([]storage.TaskInfo, error) { - return c.storage.ListTasksCancelling( - ctx, - ^uint64(0), // limit - ) - }, - storage.TaskStatusToString(storage.TaskStatusCancelling), - ) - if err != nil { - return err + for _, taskStatus := range taskStatuses { + err := c.collectTasksMetrics( + ctx, + func(context.Context) ([]storage.TaskInfo, error) { + return c.storage.ListTasksWithStatus( + ctx, + taskStatus, + ) + }, + taskStatus, + ) + if err != nil { + return err + } } - err = c.collectHangingTasksMetrics(ctx) + err := c.collectHangingTasksMetrics(ctx) if err != nil { return err } @@ -228,3 +194,20 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics( return nil } + +func (c *collectListerMetricsTask) cleanupMetrics(taskStatuses []string) { + sensors := append(taskStatuses, totalHangingTaskCountGaugeName) + + for _, taskType := range c.taskTypes { + subRegistry := c.registry.WithTags(map[string]string{ + "type": taskType, + }) + for _, sensor := range sensors { + subRegistry.Gauge(sensor).Set(float64(0)) + } + } + + for _, gauge := range c.hangingTaskGaugesByID { + gauge.Set(float64(0)) + } +} diff --git a/cloud/tasks/storage/compound_storage.go b/cloud/tasks/storage/compound_storage.go index 00d982a4835..02924e24ae7 100644 --- a/cloud/tasks/storage/compound_storage.go +++ b/cloud/tasks/storage/compound_storage.go @@ -240,13 +240,34 @@ func (s *compoundStorage) ListTasksCancelling( tasks := []TaskInfo{} err := s.visit(ctx, func(storage Storage) error { - values, err := storage.ListTasksCancelling(ctx, limit) + values, err := storage.ListTasksCancelling( + ctx, + limit, + ) tasks = append(tasks, values...) return err }) return tasks, err } +func (s *compoundStorage) ListTasksWithStatus( + ctx context.Context, + status string, +) ([]TaskInfo, error) { + + tasks := []TaskInfo{} + err := s.visit(ctx, func(storage Storage) error { + values, err := storage.ListTasksWithStatus( + ctx, + status, + ) + tasks = append(tasks, values...) + return err + }) + + return tasks, err +} + func (s *compoundStorage) ListHangingTasks( ctx context.Context, limit uint64, diff --git a/cloud/tasks/storage/mocks/storage_mock.go b/cloud/tasks/storage/mocks/storage_mock.go index e0d400122f7..b78d3f7e3c2 100644 --- a/cloud/tasks/storage/mocks/storage_mock.go +++ b/cloud/tasks/storage/mocks/storage_mock.go @@ -118,6 +118,16 @@ func (s *StorageMock) ListTasksCancelling( return res, args.Error(1) } +func (s *StorageMock) ListTasksWithStatus( + ctx context.Context, + status string, +) ([]tasks_storage.TaskInfo, error) { + + args := s.Called(ctx, status) + res, _ := args.Get(0).([]tasks_storage.TaskInfo) + return res, args.Error(1) +} + func (s *StorageMock) ListHangingTasks( ctx context.Context, limit uint64, diff --git a/cloud/tasks/storage/storage.go b/cloud/tasks/storage/storage.go index 4f9275ce347..df981eaa31c 100644 --- a/cloud/tasks/storage/storage.go +++ b/cloud/tasks/storage/storage.go @@ -320,9 +320,18 @@ type Storage interface { taskTypeWhitelist []string, ) ([]TaskInfo, error) - // Used for SRE tools. + // Used for SRE tools and metrics collection. ListTasksRunning(ctx context.Context, limit uint64) ([]TaskInfo, error) ListTasksCancelling(ctx context.Context, limit uint64) ([]TaskInfo, error) + + // Used for metrics collection. + // Wraps some of the ListTasks* methods above. + ListTasksWithStatus( + ctx context.Context, + status string, + ) ([]TaskInfo, error) + + // Used for SRE tools. ListHangingTasks(ctx context.Context, limit uint64) ([]TaskInfo, error) ListFailedTasks(ctx context.Context, since time.Time) ([]string, error) ListSlowTasks(ctx context.Context, since time.Time, estimateMiss time.Duration) ([]string, error) diff --git a/cloud/tasks/storage/storage_ydb.go b/cloud/tasks/storage/storage_ydb.go index 65d04587dce..476ec31ad54 100644 --- a/cloud/tasks/storage/storage_ydb.go +++ b/cloud/tasks/storage/storage_ydb.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/ydb-platform/nbs/cloud/tasks/errors" "github.com/ydb-platform/nbs/cloud/tasks/persistence" ) @@ -250,6 +251,30 @@ func (s *storageYDB) ListTasksCancelling( return tasks, err } +func (s *storageYDB) ListTasksWithStatus( + ctx context.Context, + status string, +) ([]TaskInfo, error) { + + limit := ^uint64(0) + + switch status { + case TaskStatusToString(TaskStatusReadyToRun): + return s.ListTasksReadyToRun(ctx, limit, nil /* taskTypeWhitelist */) + case TaskStatusToString(TaskStatusReadyToCancel): + return s.ListTasksReadyToCancel(ctx, limit, nil /* taskTypeWhitelist */) + case TaskStatusToString(TaskStatusRunning): + return s.ListTasksRunning(ctx, limit) + case TaskStatusToString(TaskStatusCancelling): + return s.ListTasksCancelling(ctx, limit) + default: + return nil, errors.NewNonRetriableErrorf( + "listing of tasks with status %s is not supported", + status, + ) + } +} + func (s *storageYDB) ListHangingTasks( ctx context.Context, limit uint64, diff --git a/cloud/tasks/tasks_tests/tasks_test.go b/cloud/tasks/tasks_tests/tasks_test.go index de5edc93b15..abc1b6067e1 100644 --- a/cloud/tasks/tasks_tests/tasks_test.go +++ b/cloud/tasks/tasks_tests/tasks_test.go @@ -1350,3 +1350,98 @@ func TestHangingTasksMetricsAreSetEvenForTasksNotRegisteredForExecution(t *testi <-gaugeSetChannel registry.AssertAllExpectations(t) } + +func TestListerMetricsCleanup(t *testing.T) { + ctx, cancel := context.WithCancel(newContext()) + defer cancel() + + db, err := newYDB(ctx) + require.NoError(t, err) + // Do not defer db.Close(ctx) in this test. + + registry := mocks.NewIgnoreUnknownCallsRegistryMock() + + config := newHangingTaskTestConfig() + + s := createServicesWithConfig(t, ctx, db, config, registry) + err = registerHangingTask(s.registry) + require.NoError(t, err) + + err = s.startRunners(ctx) + require.NoError(t, err) + + reqCtx := getRequestContext(t, ctx) + taskID, err := scheduleHangingTask(reqCtx, s.scheduler) + require.NoError(t, err) + + gaugeSetWg := sync.WaitGroup{} + gaugeUnsetWg := sync.WaitGroup{} + + runningTaskCountGaugeSetCall := registry.GetGauge( + "running", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(1), + ).Return(mock.Anything) + + totalHangingTaskCountGaugeSetCall := registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(1), + ).Return(mock.Anything) + + gaugeSetWg.Add(1) + hangingTasksGaugeSetCall := registry.GetGauge( + "hangingTasks", + map[string]string{"type": "tasks.hanging", "id": taskID}, + ).On("Set", float64(1)).Return(mock.Anything).Run( + func(args mock.Arguments) { + gaugeSetWg.Done() + }, + ) + + registry.GetGauge( + "running", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(0), + ).NotBefore( + runningTaskCountGaugeSetCall, + ).Return(mock.Anything) + + registry.GetGauge( + "totalHangingTaskCount", + map[string]string{"type": "tasks.hanging"}, + ).On( + "Set", + float64(0), + ).NotBefore( + totalHangingTaskCountGaugeSetCall, + ).Return(mock.Anything) + + gaugeUnsetWg.Add(1) + registry.GetGauge( + "hangingTasks", + map[string]string{"type": "tasks.hanging", "id": taskID}, + ).On( + "Set", + float64(0), + ).NotBefore( + hangingTasksGaugeSetCall, + ).Return(mock.Anything).Run( + func(args mock.Arguments) { + gaugeUnsetWg.Done() + }, + ) + + gaugeSetWg.Wait() + // Close connection to YDB to enforce collectListerMetricsTask failure. + err = db.Close(ctx) + require.NoError(t, err) + gaugeUnsetWg.Wait() + registry.AssertAllExpectations(t) +}