Skip to content

Commit

Permalink
improve lister
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Dec 2, 2024
1 parent c4247d6 commit 0a25cd0
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 11 deletions.
2 changes: 2 additions & 0 deletions cloud/tasks/config/config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ message TasksConfig {
// Warning: if this flag is false, then information about ended tasks
// will not be cleared from the database.
optional bool RegularSystemTasksEnabled = 33 [default = true];

optional uint64 TasksToListLimit = 34 [default = 10000];
}
7 changes: 4 additions & 3 deletions cloud/tasks/lister.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type listTasksFunc = func(
) ([]storage.TaskInfo, error)

type lister struct {
tasksToListLimit uint64
listTasks listTasksFunc
channels []*channel
pollForTasksPeriodMin time.Duration
Expand Down Expand Up @@ -64,9 +65,7 @@ func (l *lister) loop(ctx context.Context) {
continue
}

limit := channelsLen - inflightTaskCount

tasks, err := l.listTasks(ctx, uint64(limit))
tasks, err := l.listTasks(ctx, l.tasksToListLimit)
if err == nil {
logging.Debug(ctx, "lister listed %v tasks", len(tasks))

Expand Down Expand Up @@ -170,6 +169,7 @@ func (l *lister) getInflightTaskCount() uint32 {

func newLister(
ctx context.Context,
tasksToListLimit uint64,
listTasks listTasksFunc,
channelsCount uint64,
pollForTasksPeriodMin time.Duration,
Expand All @@ -178,6 +178,7 @@ func newLister(
) *lister {

lister := &lister{
tasksToListLimit: tasksToListLimit,
listTasks: listTasks,
channels: make([]*channel, channelsCount),
pollForTasksPeriodMin: pollForTasksPeriodMin,
Expand Down
4 changes: 4 additions & 0 deletions cloud/tasks/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,7 @@ func StartRunners(

listerReadyToRun := newLister(
ctx,
config.GetTasksToListLimit(),
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return taskStorage.ListTasksReadyToRun(
ctx,
Expand All @@ -914,6 +915,7 @@ func StartRunners(
)
listerReadyToCancel := newLister(
ctx,
config.GetTasksToListLimit(),
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return taskStorage.ListTasksReadyToCancel(
ctx,
Expand Down Expand Up @@ -951,6 +953,7 @@ func StartRunners(

listerStallingWhileRunning := newLister(
ctx,
config.GetTasksToListLimit(),
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return taskStorage.ListTasksStallingWhileRunning(
ctx,
Expand All @@ -966,6 +969,7 @@ func StartRunners(
)
listerStallingWhileCancelling := newLister(
ctx,
config.GetTasksToListLimit(),
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return taskStorage.ListTasksStallingWhileCancelling(
ctx,
Expand Down
18 changes: 10 additions & 8 deletions cloud/tasks/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,13 +1421,14 @@ func testListerLoop(
}
lister := newLister(
ctx,
100, // tasksToListLimit
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return tasks, nil
},
uint64(channelCount),
50*time.Millisecond,
100*time.Millisecond,
make(map[string]int64),
uint64(channelCount), // channelsCount
50*time.Millisecond, // pollForTasksPeriodMin
100*time.Millisecond, // pollForTasksPeriodMax
make(map[string]int64), // inflightTaskLimits
)

receivedTasks := make([]storage.TaskInfo, 0)
Expand Down Expand Up @@ -1497,13 +1498,14 @@ func TestListerLoopCancellingWhileReceiving(t *testing.T) {
}
lister := newLister(
ctx,
100, // tasksToListLimit
func(ctx context.Context, limit uint64) ([]storage.TaskInfo, error) {
return tasks, nil
},
uint64(channelCount),
50*time.Millisecond,
100*time.Millisecond,
make(map[string]int64),
uint64(channelCount), // channelsCount
50*time.Millisecond, // pollForTasksPeriodMin
100*time.Millisecond, // pollForTasksPeriodMax
make(map[string]int64), // inflightTaskLimits
)

receivedTasks := make([]storage.TaskInfo, 0)
Expand Down

0 comments on commit 0a25cd0

Please sign in to comment.