Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Dec 2, 2024
1 parent 0a25cd0 commit 1cd8d74
Showing 1 changed file with 38 additions and 9 deletions.
47 changes: 38 additions & 9 deletions cloud/tasks/tasks_tests/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (t *longTask) Run(ctx context.Context, execCtx tasks.ExecutionContext) erro
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(1 * time.Second):
case <-time.After(10 * time.Second):
return nil
}
}
Expand Down Expand Up @@ -711,36 +711,61 @@ func TestTasksRunningLimit(t *testing.T) {
require.NoError(t, err)
defer db.Close(ctx)

s := createServices(t, ctx, db, 10*inflightLongTaskPerNodeLimit)
s := createServices(t, ctx, db, 3*inflightLongTaskPerNodeLimit)

err = registerDoublerTask(s.registry)
require.NoError(t, err)

err = registerLongTask(s.registry)
require.NoError(t, err)

err = s.startRunners(ctx)
require.NoError(t, err)

tasksIds := []string{}
longTasksIds := []string{}
scheduledLongTaskCount := 6 * inflightLongTaskPerNodeLimit
for i := 0; i < scheduledLongTaskCount; i++ {
reqCtx := getRequestContext(t, ctx)
id, err := scheduleLongTask(reqCtx, s.scheduler)
require.NoError(t, err)

tasksIds = append(tasksIds, id)
longTasksIds = append(longTasksIds, id)
}

endedLongTaskCount := 0
errs := make(chan error)
for _, id := range tasksIds {
longTasksErrs := make(chan error)
for _, id := range longTasksIds {
go func(id string) {
_, err := waitTask(ctx, s.scheduler, id)
longTasksErrs <- err
}(id)
}

doublerTasksIds := []string{}
scheduledDoublerTaskCount := 2 * inflightLongTaskPerNodeLimit
for i := 0; i < scheduledDoublerTaskCount; i++ {
reqCtx := getRequestContext(t, ctx)
id, err := scheduleDoublerTask(reqCtx, s.scheduler, 1)
require.NoError(t, err)

errs <- err
doublerTasksIds = append(doublerTasksIds, id)
}

endedDoublerTaskCount := 0
doublerTaskErrs := make(chan error)
for _, id := range doublerTasksIds {
go func(id string) {
_, err := waitTaskWithTimeout(
ctx,
s.scheduler,
id,
2*time.Second,
)
doublerTaskErrs <- err
}(id)
}

ticker := time.NewTicker(20 * time.Millisecond)

for {
select {
case <-ticker.C:
Expand All @@ -764,11 +789,15 @@ func TestTasksRunningLimit(t *testing.T) {
// listerStallingWhileRunning lister.
// Thus we need to compare runningLongTaskCount with doubled inflightLongTaskPerNodeLimit.
require.LessOrEqual(t, runningLongTaskCount, 2*inflightLongTaskPerNodeLimit)
case err := <-errs:
case err := <-doublerTaskErrs:
require.NoError(t, err)
endedDoublerTaskCount++
case err := <-longTasksErrs:
require.NoError(t, err)
endedLongTaskCount++

if endedLongTaskCount == scheduledLongTaskCount {
require.EqualValues(t, scheduledDoublerTaskCount, endedDoublerTaskCount)
return
}
}
Expand Down

0 comments on commit 1cd8d74

Please sign in to comment.