From 30086dab4ba8c0d3733d2e693bf492d451a64ff0 Mon Sep 17 00:00:00 2001 From: agungdwiprasetyo Date: Wed, 3 Apr 2024 09:08:31 +0700 Subject: [PATCH] task queue worker: set stop all job to background --- .../app/task_queue_worker/graphql_resolver.go | 52 +++++++++---------- init.go | 2 +- 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/codebase/app/task_queue_worker/graphql_resolver.go b/codebase/app/task_queue_worker/graphql_resolver.go index 0e67448..0116b8f 100644 --- a/codebase/app/task_queue_worker/graphql_resolver.go +++ b/codebase/app/task_queue_worker/graphql_resolver.go @@ -185,31 +185,33 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct { input.TaskName, strings.Join(r.engine.tasks, ", ")) } - r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, true, "Stopping...") - r.engine.opt.queue.Clear(ctx, input.TaskName) - go r.engine.stopAllJobInTask(input.TaskName) - - incrQuery := map[string]int64{} - affectedStatus := []string{string(StatusQueueing), string(StatusRetrying)} - for _, status := range affectedStatus { - countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx, - &Filter{ - TaskName: input.TaskName, Status: &status, - }, - map[string]interface{}{ - "status": StatusStopped, - }, - ) - if err != nil { - continue + go func(ctx context.Context, taskName string) { + r.engine.subscriber.broadcastWhenChangeAllJob(ctx, taskName, true, "Stopping...") + r.engine.opt.queue.Clear(ctx, taskName) + r.engine.stopAllJobInTask(taskName) + + incrQuery := map[string]int64{} + affectedStatus := []string{string(StatusQueueing), string(StatusRetrying)} + for _, status := range affectedStatus { + countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx, + &Filter{ + TaskName: taskName, Status: &status, + }, + map[string]interface{}{ + "status": StatusStopped, + }, + ) + if err != nil { + continue + } + incrQuery[strings.ToLower(status)] -= countMatchedFilter + incrQuery[strings.ToLower(string(StatusStopped))] += countAffected } - incrQuery[strings.ToLower(status)] -= countMatchedFilter - incrQuery[strings.ToLower(string(StatusStopped))] += countAffected - } - r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, false, "") - r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery) - r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) + r.engine.subscriber.broadcastWhenChangeAllJob(ctx, taskName, false, "") + r.engine.opt.persistent.Summary().IncrementSummary(ctx, taskName, incrQuery) + r.engine.subscriber.broadcastAllToSubscribers(ctx) + }(r.engine.ctx, input.TaskName) return "Success stop all job in task " + input.TaskName, nil } @@ -218,7 +220,6 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { Filter FilterMutateJobInputResolver }) (string, error) { go func(ctx context.Context, req *FilterMutateJobInputResolver) { - filter := req.ToFilter() r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Retrying...") @@ -252,7 +253,7 @@ func (r *rootResolver) RetryAllJob(ctx context.Context, input struct { r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, false, "") r.engine.opt.persistent.Summary().IncrementSummary(ctx, filter.TaskName, incr) - r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx) + r.engine.subscriber.broadcastAllToSubscribers(ctx) r.engine.registerNextJob(false, filter.TaskName) }(r.engine.ctx, &input.Filter) @@ -264,7 +265,6 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct { Filter FilterMutateJobInputResolver }) (string, error) { go func(ctx context.Context, req *FilterMutateJobInputResolver) { - filter := req.ToFilter() r.engine.subscriber.broadcastWhenChangeAllJob(ctx, filter.TaskName, true, "Cleaning...") diff --git a/init.go b/init.go index 6d667b6..4050adf 100644 --- a/init.go +++ b/init.go @@ -2,5 +2,5 @@ package candi const ( // Version of this library - Version = "v1.17.0" + Version = "v1.17.1" )