From b594b1cbc7118d64621a78bb4f937ebd44c451f1 Mon Sep 17 00:00:00 2001 From: Roman Donchenko Date: Wed, 8 Jan 2025 16:48:00 +0200 Subject: [PATCH] Add a management command to run a configured periodic job immediately (#8909) This can be useful for testing. --- .../management/commands/runperiodicjob.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) create mode 100644 cvat/apps/engine/management/commands/runperiodicjob.py diff --git a/cvat/apps/engine/management/commands/runperiodicjob.py b/cvat/apps/engine/management/commands/runperiodicjob.py new file mode 100644 index 000000000000..765f16541cfd --- /dev/null +++ b/cvat/apps/engine/management/commands/runperiodicjob.py @@ -0,0 +1,23 @@ +from argparse import ArgumentParser + +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.utils.module_loading import import_string + + +class Command(BaseCommand): + help = "Run a configured periodic job immediately" + + def add_arguments(self, parser: ArgumentParser) -> None: + parser.add_argument("job_id", help="ID of the job to run") + + def handle(self, *args, **options): + job_id = options["job_id"] + + for job_definition in settings.PERIODIC_RQ_JOBS: + if job_definition["id"] == job_id: + job_func = import_string(job_definition["func"]) + job_func() + return + + raise CommandError(f"Job with ID {job_id} not found")