Skip to content

Commit

Permalink
[Disk Manager] cleanup lister metrics when collectListerMetricsTask t…
Browse files Browse the repository at this point in the history
…ask ends (#2509)

* [Disk Manager] cleanup lister metrics when collectListerMetricsTask task ends

* [Disk Manager] improve collectListerMetricsTask task

* [Disk Manager] improve collectListerMetricsTask task

* [Disk Manager] improve collectListerMetricsTask task

* [Disk Manager] minor improvement

* [Disk Manager] minor improvement

* [Disk Manager] minor improvement
  • Loading branch information
gy2411 authored Nov 20, 2024
1 parent 62b7989 commit 2186011
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 59 deletions.
97 changes: 40 additions & 57 deletions cloud/tasks/collect_lister_metrics_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,69 +40,35 @@ func (c *collectListerMetricsTask) Run(
execCtx ExecutionContext,
) error {

taskStatuses := []string{
storage.TaskStatusToString(storage.TaskStatusReadyToRun),
storage.TaskStatusToString(storage.TaskStatusRunning),
storage.TaskStatusToString(storage.TaskStatusReadyToCancel),
storage.TaskStatusToString(storage.TaskStatusCancelling),
}
defer c.cleanupMetrics(taskStatuses)

ticker := time.NewTicker(c.metricsCollectionInterval)
defer ticker.Stop()

for range ticker.C {
err := c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return c.storage.ListTasksReadyToRun(
ctx,
^uint64(0), // limit
nil,
)
},
storage.TaskStatusToString(storage.TaskStatusReadyToRun),
)
if err != nil {
return err
}

err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return c.storage.ListTasksRunning(
ctx,
^uint64(0), // limit
)
},
storage.TaskStatusToString(storage.TaskStatusRunning),
)
if err != nil {
return err
}

err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return c.storage.ListTasksReadyToCancel(
ctx,
^uint64(0), // limit
nil,
)
},
storage.TaskStatusToString(storage.TaskStatusReadyToCancel),
)
if err != nil {
return err
}

err = c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return c.storage.ListTasksCancelling(
ctx,
^uint64(0), // limit
)
},
storage.TaskStatusToString(storage.TaskStatusCancelling),
)
if err != nil {
return err
for _, taskStatus := range taskStatuses {
err := c.collectTasksMetrics(
ctx,
func(context.Context) ([]storage.TaskInfo, error) {
return c.storage.ListTasksWithStatus(
ctx,
taskStatus,
)
},
taskStatus,
)
if err != nil {
return err
}
}

err = c.collectHangingTasksMetrics(ctx)
err := c.collectHangingTasksMetrics(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -228,3 +194,20 @@ func (c *collectListerMetricsTask) collectHangingTasksMetrics(

return nil
}

func (c *collectListerMetricsTask) cleanupMetrics(taskStatuses []string) {
sensors := append(taskStatuses, totalHangingTaskCountGaugeName)

for _, taskType := range c.taskTypes {
subRegistry := c.registry.WithTags(map[string]string{
"type": taskType,
})
for _, sensor := range sensors {
subRegistry.Gauge(sensor).Set(float64(0))
}
}

for _, gauge := range c.hangingTaskGaugesByID {
gauge.Set(float64(0))
}
}
23 changes: 22 additions & 1 deletion cloud/tasks/storage/compound_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,34 @@ func (s *compoundStorage) ListTasksCancelling(

tasks := []TaskInfo{}
err := s.visit(ctx, func(storage Storage) error {
values, err := storage.ListTasksCancelling(ctx, limit)
values, err := storage.ListTasksCancelling(
ctx,
limit,
)
tasks = append(tasks, values...)
return err
})
return tasks, err
}

func (s *compoundStorage) ListTasksWithStatus(
ctx context.Context,
status string,
) ([]TaskInfo, error) {

tasks := []TaskInfo{}
err := s.visit(ctx, func(storage Storage) error {
values, err := storage.ListTasksWithStatus(
ctx,
status,
)
tasks = append(tasks, values...)
return err
})

return tasks, err
}

func (s *compoundStorage) ListHangingTasks(
ctx context.Context,
limit uint64,
Expand Down
10 changes: 10 additions & 0 deletions cloud/tasks/storage/mocks/storage_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ func (s *StorageMock) ListTasksCancelling(
return res, args.Error(1)
}

func (s *StorageMock) ListTasksWithStatus(
ctx context.Context,
status string,
) ([]tasks_storage.TaskInfo, error) {

args := s.Called(ctx, status)
res, _ := args.Get(0).([]tasks_storage.TaskInfo)
return res, args.Error(1)
}

func (s *StorageMock) ListHangingTasks(
ctx context.Context,
limit uint64,
Expand Down
11 changes: 10 additions & 1 deletion cloud/tasks/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,18 @@ type Storage interface {
taskTypeWhitelist []string,
) ([]TaskInfo, error)

// Used for SRE tools.
// Used for SRE tools and metrics collection.
ListTasksRunning(ctx context.Context, limit uint64) ([]TaskInfo, error)
ListTasksCancelling(ctx context.Context, limit uint64) ([]TaskInfo, error)

// Used for metrics collection.
// Wraps some of the ListTasks* methods above.
ListTasksWithStatus(
ctx context.Context,
status string,
) ([]TaskInfo, error)

// Used for SRE tools.
ListHangingTasks(ctx context.Context, limit uint64) ([]TaskInfo, error)
ListFailedTasks(ctx context.Context, since time.Time) ([]string, error)
ListSlowTasks(ctx context.Context, since time.Time, estimateMiss time.Duration) ([]string, error)
Expand Down
25 changes: 25 additions & 0 deletions cloud/tasks/storage/storage_ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/persistence"
)

Expand Down Expand Up @@ -250,6 +251,30 @@ func (s *storageYDB) ListTasksCancelling(
return tasks, err
}

func (s *storageYDB) ListTasksWithStatus(
ctx context.Context,
status string,
) ([]TaskInfo, error) {

limit := ^uint64(0)

switch status {
case TaskStatusToString(TaskStatusReadyToRun):
return s.ListTasksReadyToRun(ctx, limit, nil /* taskTypeWhitelist */)
case TaskStatusToString(TaskStatusReadyToCancel):
return s.ListTasksReadyToCancel(ctx, limit, nil /* taskTypeWhitelist */)
case TaskStatusToString(TaskStatusRunning):
return s.ListTasksRunning(ctx, limit)
case TaskStatusToString(TaskStatusCancelling):
return s.ListTasksCancelling(ctx, limit)
default:
return nil, errors.NewNonRetriableErrorf(
"listing of tasks with status %s is not supported",
status,
)
}
}

func (s *storageYDB) ListHangingTasks(
ctx context.Context,
limit uint64,
Expand Down
95 changes: 95 additions & 0 deletions cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1350,3 +1350,98 @@ func TestHangingTasksMetricsAreSetEvenForTasksNotRegisteredForExecution(t *testi
<-gaugeSetChannel
registry.AssertAllExpectations(t)
}

func TestListerMetricsCleanup(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
require.NoError(t, err)
// Do not defer db.Close(ctx) in this test.

registry := mocks.NewIgnoreUnknownCallsRegistryMock()

config := newHangingTaskTestConfig()

s := createServicesWithConfig(t, ctx, db, config, registry)
err = registerHangingTask(s.registry)
require.NoError(t, err)

err = s.startRunners(ctx)
require.NoError(t, err)

reqCtx := getRequestContext(t, ctx)
taskID, err := scheduleHangingTask(reqCtx, s.scheduler)
require.NoError(t, err)

gaugeSetWg := sync.WaitGroup{}
gaugeUnsetWg := sync.WaitGroup{}

runningTaskCountGaugeSetCall := registry.GetGauge(
"running",
map[string]string{"type": "tasks.hanging"},
).On(
"Set",
float64(1),
).Return(mock.Anything)

totalHangingTaskCountGaugeSetCall := registry.GetGauge(
"totalHangingTaskCount",
map[string]string{"type": "tasks.hanging"},
).On(
"Set",
float64(1),
).Return(mock.Anything)

gaugeSetWg.Add(1)
hangingTasksGaugeSetCall := registry.GetGauge(
"hangingTasks",
map[string]string{"type": "tasks.hanging", "id": taskID},
).On("Set", float64(1)).Return(mock.Anything).Run(
func(args mock.Arguments) {
gaugeSetWg.Done()
},
)

registry.GetGauge(
"running",
map[string]string{"type": "tasks.hanging"},
).On(
"Set",
float64(0),
).NotBefore(
runningTaskCountGaugeSetCall,
).Return(mock.Anything)

registry.GetGauge(
"totalHangingTaskCount",
map[string]string{"type": "tasks.hanging"},
).On(
"Set",
float64(0),
).NotBefore(
totalHangingTaskCountGaugeSetCall,
).Return(mock.Anything)

gaugeUnsetWg.Add(1)
registry.GetGauge(
"hangingTasks",
map[string]string{"type": "tasks.hanging", "id": taskID},
).On(
"Set",
float64(0),
).NotBefore(
hangingTasksGaugeSetCall,
).Return(mock.Anything).Run(
func(args mock.Arguments) {
gaugeUnsetWg.Done()
},
)

gaugeSetWg.Wait()
// Close connection to YDB to enforce collectListerMetricsTask failure.
err = db.Close(ctx)
require.NoError(t, err)
gaugeUnsetWg.Wait()
registry.AssertAllExpectations(t)
}

0 comments on commit 2186011

Please sign in to comment.