Skip to content

Commit

Permalink
[Disk Manager] check number of inflight tasks properly in test (#2862)
Browse files Browse the repository at this point in the history
  • Loading branch information
gy2411 authored Jan 17, 2025
1 parent 8f56e59 commit d73c679
Showing 1 changed file with 65 additions and 21 deletions.
86 changes: 65 additions & 21 deletions cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -179,31 +180,57 @@ func createServicesWithConfig(
}
}

func createServices(
func createServicesWithMetricsRegistry(
t *testing.T,
ctx context.Context,
db *persistence.YDBClient,
runnersCount uint64,
metricsRegistry metrics.Registry,
) services {

config := proto.Clone(newDefaultConfig()).(*tasks_config.TasksConfig)
config.RunnersCount = &runnersCount
config.StalkingRunnersCount = &runnersCount

return createServicesWithConfig(
t,
ctx,
db,
config,
metricsRegistry,
)
}

func createServices(
t *testing.T,
ctx context.Context,
db *persistence.YDBClient,
runnersCount uint64,
) services {

return createServicesWithMetricsRegistry(
t,
ctx,
db,
runnersCount,
metrics_empty.NewRegistry(),
)
}

func (s *services) startRunners(ctx context.Context) error {
return s.startRunnersWithMetricsRegistry(ctx, metrics_empty.NewRegistry())
}

func (s *services) startRunnersWithMetricsRegistry(
ctx context.Context,
metricsRegistry metrics.Registry,
) error {

return tasks.StartRunners(
ctx,
s.storage,
s.registry,
metrics_empty.NewRegistry(),
metricsRegistry,
s.config,
"localhost",
)
Expand Down Expand Up @@ -703,23 +730,50 @@ func TestTasksRunningOneTask(t *testing.T) {
require.EqualValues(t, 2*123, response)
}

func TestTasksRunningLimit(t *testing.T) {
func TestTasksInflightLimit(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())
defer cancel()

db, err := newYDB(ctx)
require.NoError(t, err)
defer db.Close(ctx)

s := createServices(t, ctx, db, 3*inflightLongTaskPerNodeLimit)
registry := mocks.NewIgnoreUnknownCallsRegistryMock()
defer registry.AssertAllExpectations(t)

s := createServicesWithMetricsRegistry(
t,
ctx,
db,
3*inflightLongTaskPerNodeLimit, // runnersCount
registry,
)

err = registerDoublerTask(s.registry)
require.NoError(t, err)

err = registerLongTask(s.registry)
require.NoError(t, err)

err = s.startRunners(ctx)
var inflightLongTasksCount atomic.Int32
registry.GetGauge(
"inflightTasks",
map[string]string{"type": "long"},
).On("Add", float64(1)).Return().Run(
func(args mock.Arguments) {
inflightLongTasksCount.Add(1)
},
)
registry.GetGauge(
"inflightTasks",
map[string]string{"type": "long"},
).On("Add", float64(-1)).Return().Run(
func(args mock.Arguments) {
inflightLongTasksCount.Add(-1)
},
)

err = s.startRunnersWithMetricsRegistry(ctx, registry)
require.NoError(t, err)

longTaskIDs := []string{}
Expand Down Expand Up @@ -769,26 +823,16 @@ func TestTasksRunningLimit(t *testing.T) {
for {
select {
case <-ticker.C:
runningTasks, _ := s.storage.ListTasksRunning(
ctx,
uint64(scheduledLongTaskCount),
)
require.NoError(t, err)

logging.Debug(ctx, "Listed running tasks: %v+", runningTasks)

runningLongTaskCount := 0
for _, task := range runningTasks {
if task.TaskType == "long" {
runningLongTaskCount++
}
}

// Note that inflight task is not the same as task with status 'running'.
// The task with status 'running' might not be executing right now
// in case of retriable or interrupt execution error.
count := int(inflightLongTasksCount.Load())
logging.Debug(ctx, "There are %v inflight tasks of type long", count)
// We have separate inflight per node limit for each lister.
// So long tasks can be taken by listerReadyToRun or
// listerStallingWhileRunning lister.
// Thus we need to compare runningLongTaskCount with doubled inflightLongTaskPerNodeLimit.
require.LessOrEqual(t, runningLongTaskCount, 2*inflightLongTaskPerNodeLimit)
require.LessOrEqual(t, count, 2*inflightLongTaskPerNodeLimit)
case err := <-doublerTaskErrs:
require.NoError(t, err)
endedDoublerTaskCount++
Expand Down

0 comments on commit d73c679

Please sign in to comment.