Skip to content

Commit

Permalink
Fix export and export cleanup job hangs in scheduled state (#8198)
Browse files Browse the repository at this point in the history
<!-- Raise an issue to propose your change
(https://github.com/cvat-ai/cvat/issues).
It helps to avoid duplication of efforts from multiple independent
contributors.
Discuss your ideas with maintainers to be sure that changes will be
approved and merged.
Read the [Contribution guide](https://docs.cvat.ai/docs/contributing/).
-->

<!-- Provide a general summary of your changes in the Title above -->

### Motivation and context
<!-- Why is this change required? What problem does it solve? If it
fixes an open
issue, please link to the issue here. Describe your changes in detail,
add
screenshots. -->

There are 2 schedulers supported by django_rq and by python RQ:
`rq_scheduler` and a newer, builtin queue scheduler in RQ. rq_scheduler
seems to die slowly in favor of the builtin scheduler. The schedulers
have compatible API, but not the implementation. The existing job retry
implementation relies on `retry()` calls, which, in turn, rely on the
builtin RQ scheduler. CVAT uses rq_scheduler a for some tasks, so it its
executed. The builtin RQ scheduler needs the `--with-scheduler` startup
parameter on the worker processes. Thus, the jobs were hanging in the
scheduled state, as the builtin RQ scheduler was not running on the
queues. As CVAT is currently using rq_scheduler, it's decided to
continue using it to avoid disruption and use of 2 schedulers together.
The implementation in this PR does best efforts to be correct, but it's
has potential problems with multiple same jobs running in parallel.

In future we need to migrate to the builtin RQ scheduler, as it is the
only one maintained as of February 2023.

### How has this been tested?
<!-- Please describe in detail how you tested your changes.
Include details of your testing environment, and the tests you ran to
see how your change affects other areas of the code, etc. -->

### Checklist
<!-- Go over all the following points, and put an `x` in all the boxes
that apply.
If an item isn't applicable for some reason, then ~~explicitly
strikethrough~~ the whole
line. If you don't do that, GitHub will show incorrect progress for the
pull request.
If you're unsure about any of these, don't hesitate to ask. We're here
to help! -->
- [ ] I submit my changes into the `develop` branch
- [ ] I have created a changelog fragment <!-- see top comment in
CHANGELOG.md -->
- [ ] I have updated the documentation accordingly
- [ ] I have added tests to cover my changes
- [ ] I have linked related issues (see [GitHub docs](

https://help.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword))
- [ ] I have increased versions of npm packages if it is necessary

([cvat-canvas](https://github.com/cvat-ai/cvat/tree/develop/cvat-canvas#versioning),

[cvat-core](https://github.com/cvat-ai/cvat/tree/develop/cvat-core#versioning),

[cvat-data](https://github.com/cvat-ai/cvat/tree/develop/cvat-data#versioning)
and

[cvat-ui](https://github.com/cvat-ai/cvat/tree/develop/cvat-ui#versioning))

### License

- [ ] I submit _my code changes_ under the same [MIT License](
https://github.com/cvat-ai/cvat/blob/develop/LICENSE) that covers the
project.
  Feel free to contact the maintainers if that's a concern.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- **Bug Fixes**
- Improved export and export cache clean operations by adding a retry
mechanism to handle job retries, preventing hangs.

- **Chores**
- Updated internal process for handling job retries using RQ scheduler
for better reliability.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
zhiltsov-max authored Jul 24, 2024
1 parent e981718 commit 41d2f04
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
### Fixed

- Export and export cache clean rq job retries' hangs
(<https://github.com/cvat-ai/cvat/pull/8198>)
3 changes: 0 additions & 3 deletions cvat/apps/dataset_manager/tests/test_rest_api_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,6 @@ def test_export_can_request_retry_on_locking_failure(self):

mock_get_export_cache_lock.assert_called()
self.assertEqual(mock_rq_job.retries_left, 1)
self.assertEqual(len(mock_rq_job.retry_intervals), 1)

def test_export_can_reuse_older_file_if_still_relevant(self):
format_name = "CVAT for images 1.1"
Expand Down Expand Up @@ -1926,7 +1925,6 @@ def test_cleanup_can_request_retry_on_locking_failure(self):

mock_get_export_cache_lock.assert_called()
self.assertEqual(mock_rq_job.retries_left, 1)
self.assertEqual(len(mock_rq_job.retry_intervals), 1)
self.assertTrue(osp.isfile(export_path))

def test_cleanup_can_fail_if_no_file(self):
Expand Down Expand Up @@ -1969,7 +1967,6 @@ def test_cleanup_can_defer_removal_if_file_is_used_recently(self):
clear_export_cache(file_path=export_path, file_ctime=file_ctime, logger=MagicMock())

self.assertEqual(mock_rq_job.retries_left, 1)
self.assertEqual(len(mock_rq_job.retry_intervals), 1)
self.assertTrue(osp.isfile(export_path))

def test_cleanup_can_be_called_with_old_signature_and_values(self):
Expand Down
72 changes: 59 additions & 13 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
import rq
from django.conf import settings
from django.utils import timezone
from rq_scheduler import Scheduler

import cvat.apps.dataset_manager.project as project
import cvat.apps.dataset_manager.task as task
from cvat.apps.engine.log import ServerLogManager
from cvat.apps.engine.models import Job, Project, Task
from cvat.apps.engine.utils import get_rq_lock_by_user

from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS
from .util import (
Expand Down Expand Up @@ -58,6 +60,42 @@ def get_export_cache_ttl(db_instance: str | Project | Task | Job) -> timedelta:

return TTL_CONSTS[db_instance.lower()]

def _retry_current_rq_job(time_delta: timedelta) -> rq.job.Job:
# TODO: implement using retries once we move from rq_scheduler to builtin RQ scheduler
# for better reliability and error reporting

# This implementation can potentially lead to 2 jobs with the same name running in parallel,
# if the retry is enqueued immediately.
assert time_delta.total_seconds() > 0

current_rq_job = rq.get_current_job()

def _patched_retry(*_1, **_2):
scheduler: Scheduler = django_rq.get_scheduler(
settings.CVAT_QUEUES.EXPORT_DATA.value
)

user_id = current_rq_job.meta.get('user', {}).get('id') or -1

with get_rq_lock_by_user(settings.CVAT_QUEUES.EXPORT_DATA.value, user_id):
scheduler.enqueue_in(
time_delta,
current_rq_job.func,
*current_rq_job.args,
**current_rq_job.kwargs,
job_id=current_rq_job.id,
meta=current_rq_job.meta,
depends_on=current_rq_job.dependency_ids,
job_ttl=current_rq_job.ttl,
job_result_ttl=current_rq_job.result_ttl,
job_description=current_rq_job.description,
on_success=current_rq_job.success_callback,
on_failure=current_rq_job.failure_callback,
)

current_rq_job.retries_left = 1
setattr(current_rq_job, 'retry', _patched_retry)
return current_rq_job

def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=None, save_images=False):
try:
Expand Down Expand Up @@ -109,7 +147,9 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
server_url=server_url, save_images=save_images)
os.replace(temp_file, output_path)

scheduler = django_rq.get_scheduler(settings.CVAT_QUEUES.EXPORT_DATA.value)
scheduler: Scheduler = django_rq.get_scheduler(
settings.CVAT_QUEUES.EXPORT_DATA.value
)
cleaning_job = scheduler.enqueue_in(
time_delta=cache_ttl,
func=clear_export_cache,
Expand All @@ -122,7 +162,9 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
"and available for downloading for the next {}. "
"Export cache cleaning job is enqueued, id '{}'".format(
db_instance.__class__.__name__.lower(),
db_instance.name if isinstance(db_instance, (Project, Task)) else db_instance.id,
db_instance.name if isinstance(
db_instance, (Project, Task)
) else db_instance.id,
dst_format, output_path, cache_ttl,
cleaning_job.id
)
Expand All @@ -131,10 +173,13 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
return output_path
except LockNotAvailableError:
# Need to retry later if the lock was not available
rq_job = rq.get_current_job() # the worker references the same object
rq_job.retries_left = 1
rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()]
raise # should be handled by the worker
_retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL)
logger.info(
"Failed to acquire export cache lock. Retrying in {}".format(
EXPORT_LOCKED_RETRY_INTERVAL
)
)
raise
except Exception:
log_exception(logger)
raise
Expand Down Expand Up @@ -179,9 +224,7 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger

if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds():
# Need to retry later, the export is in use
rq_job = rq.get_current_job() # the worker references the same object
rq_job.retries_left = 1
rq_job.retry_intervals = [cache_ttl.total_seconds()]
_retry_current_rq_job(cache_ttl)
logger.info(
"Export cache file '{}' is recently accessed, will retry in {}".format(
file_path, cache_ttl
Expand All @@ -194,10 +237,13 @@ def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger
logger.info("Export cache file '{}' successfully removed".format(file_path))
except LockNotAvailableError:
# Need to retry later if the lock was not available
rq_job = rq.get_current_job() # the worker references the same object
rq_job.retries_left = 1
rq_job.retry_intervals = [EXPORT_LOCKED_RETRY_INTERVAL.total_seconds()]
raise # should be handled by the worker
_retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL)
logger.info(
"Failed to acquire export cache lock. Retrying in {}".format(
EXPORT_LOCKED_RETRY_INTERVAL
)
)
raise
except Exception:
log_exception(logger)
raise
Expand Down

0 comments on commit 41d2f04

Please sign in to comment.