Skip to content

Commit

Permalink
Add a management command to run a configured periodic job immediately (
Browse files Browse the repository at this point in the history
…cvat-ai#8909)

This can be useful for testing.
  • Loading branch information
SpecLad authored Jan 8, 2025
1 parent 331ff86 commit b594b1c
Showing 1 changed file with 23 additions and 0 deletions.
23 changes: 23 additions & 0 deletions cvat/apps/engine/management/commands/runperiodicjob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from argparse import ArgumentParser

from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.utils.module_loading import import_string


class Command(BaseCommand):
help = "Run a configured periodic job immediately"

def add_arguments(self, parser: ArgumentParser) -> None:
parser.add_argument("job_id", help="ID of the job to run")

def handle(self, *args, **options):
job_id = options["job_id"]

for job_definition in settings.PERIODIC_RQ_JOBS:
if job_definition["id"] == job_id:
job_func = import_string(job_definition["func"])
job_func()
return

raise CommandError(f"Job with ID {job_id} not found")

0 comments on commit b594b1c

Please sign in to comment.