diff --git a/changelog.d/20240718_202907_mzhiltso_fix_export_job_retries.md b/changelog.d/20240718_202907_mzhiltso_fix_export_job_retries.md new file mode 100644 index 000000000000..ee8fb3429ee9 --- /dev/null +++ b/changelog.d/20240718_202907_mzhiltso_fix_export_job_retries.md @@ -0,0 +1,4 @@ +### Fixed + +- Export and export cache clean rq job retries' hangs + () diff --git a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py index 3d14e3cbe518..767fb07fe962 100644 --- a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py +++ b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py @@ -1833,7 +1833,6 @@ def test_export_can_request_retry_on_locking_failure(self): mock_get_export_cache_lock.assert_called() self.assertEqual(mock_rq_job.retries_left, 1) - self.assertEqual(len(mock_rq_job.retry_intervals), 1) def test_export_can_reuse_older_file_if_still_relevant(self): format_name = "CVAT for images 1.1" @@ -1926,7 +1925,6 @@ def test_cleanup_can_request_retry_on_locking_failure(self): mock_get_export_cache_lock.assert_called() self.assertEqual(mock_rq_job.retries_left, 1) - self.assertEqual(len(mock_rq_job.retry_intervals), 1) self.assertTrue(osp.isfile(export_path)) def test_cleanup_can_fail_if_no_file(self): @@ -1969,7 +1967,6 @@ def test_cleanup_can_defer_removal_if_file_is_used_recently(self): clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock()) self.assertEqual(mock_rq_job.retries_left, 1) - self.assertEqual(len(mock_rq_job.retry_intervals), 1) self.assertTrue(osp.isfile(export_path)) def test_cleanup_can_be_called_with_old_signature_and_values(self): diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index cb9e4f0f674a..35e40c8c03a3 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -13,11 +13,13 @@ import rq from django.conf import settings from django.utils import timezone +from rq_scheduler import Scheduler import cvat.apps.dataset_manager.project as project import cvat.apps.dataset_manager.task as task from cvat.apps.engine.log import ServerLogManager from cvat.apps.engine.models import Job, Project, Task +from cvat.apps.engine.utils import get_rq_lock_by_user from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS from .util import ( @@ -58,6 +60,42 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta: return TTL_CONSTS[db_instance.lower()] +def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job: + # TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler + # for better reliability and error reporting + + # This implementation can potentially lead to 2 jobs with the same name running in parallel, + # if the retry is enqueued immediately. + assert time_delta.total_seconds() > 0 + + current_rq_job = rq.get_current_job() + + def _patched_retry(*_1, **_2): + scheduler: Scheduler = django_rq.get_scheduler( + settings.CVAT_QUEUES.EXPORT_DATA.value + ) + + user_id = current_rq_job.meta.get('user', {}).get('id') or -1 + + with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id): + scheduler.enqueue_in( + time_delta, + current_rq_job.func, + *current_rq_job.args, + **current_rq_job.kwargs, + job_id=current_rq_job.id, + meta=current_rq_job.meta, + depends_on=current_rq_job.dependency_ids, + job_ttl=current_rq_job.ttl, + job_result_ttl=current_rq_job.result_ttl, + job_description=current_rq_job.description, + on_success=current_rq_job.success_callback, + on_failure=current_rq_job.failure_callback, + ) + + current_rq_job.retries_left = 1 + setattr(current_rq_job, 'retry', _patched_retry) + return current_rq_job def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False): try: @@ -109,7 +147,9 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No server_url=server_url, save_images=save_images) os.replace(temp_file, output_path) - scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value) + scheduler: Scheduler = django_rq.get_scheduler( + settings.CVAT_QUEUES.EXPORT_DATA.value + ) cleaning_job = scheduler.enqueue_in( time_delta=cache_ttl, func=clear_export_cache, @@ -122,7 +162,9 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No "and available for downloading for the next {}. " "Export cache cleaning job is enqueued, id '{}'".format( db_instance.__class__.__name__.lower(), - db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id, + db_instance.name if isinstance( + db_instance, (Project, Task) + ) else db_instance.id, dst_format, output_path, cache_ttl, cleaning_job.id ) @@ -131,10 +173,13 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No return output_path except LockNotAvailableError: # Need to retry later if the lock was not available - rq_job = rq.get_current_job() # the worker references the same object - rq_job.retries_left = 1 - rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()] - raise # should be handled by the worker + _retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) + logger.info( + "Failed to acquire export cache lock. Retrying in {}".format( + EXPORT_LOCKED_RETRY_INTERVAL + ) + ) + raise except Exception: log_exception(logger) raise @@ -179,9 +224,7 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds(): # Need to retry later, the export is in use - rq_job = rq.get_current_job() # the worker references the same object - rq_job.retries_left = 1 - rq_job.retry_intervals = [cache_ttl.total_seconds()] + _retry_current_rq_job(cache_ttl) logger.info( "Export cache file '{}' is recently accessed, will retry in {}".format( file_path, cache_ttl @@ -194,10 +237,13 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger logger.info("Export cache file '{}' successfully removed".format(file_path)) except LockNotAvailableError: # Need to retry later if the lock was not available - rq_job = rq.get_current_job() # the worker references the same object - rq_job.retries_left = 1 - rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()] - raise # should be handled by the worker + _retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL) + logger.info( + "Failed to acquire export cache lock. Retrying in {}".format( + EXPORT_LOCKED_RETRY_INTERVAL + ) + ) + raise except Exception: log_exception(logger) raise