Skip to content

Commit

Permalink
Refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Jan 9, 2025
1 parent 7a1db7b commit 476c688
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 123 deletions.
8 changes: 8 additions & 0 deletions changelog.d/20241224_150942_maria_clear_cache_cron_job.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
### Added

- Setting `TMP_FILE_OR_DIR_RETENTION_DAYS`, which defines maximum retention period
of a file or dir in temporary directory
(<https://github.com/cvat-ai/cvat/pull/8804>)
- Cron job to remove outdated files and directories from CVAT tmp directory
(<https://github.com/cvat-ai/cvat/pull/8804>)

### Changed

- Export cache cleaning moved to a separate cron job
Expand Down
65 changes: 29 additions & 36 deletions cvat/apps/dataset_manager/cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2024 CVAT.ai Corporation
# Copyright (C) 2025 CVAT.ai Corporation
#
# SPDX-License-Identifier: MIT

Expand All @@ -23,9 +23,7 @@
from cvat.apps.dataset_manager.util import (
CacheFileOrDirPathParseError,
ExportCacheManager,
OperationType,
TmpDirManager,
TmpEntityType,
get_export_cache_lock,
)
from cvat.apps.dataset_manager.views import (
Expand Down Expand Up @@ -57,10 +55,10 @@ def clear_export_cache(file_path: Path) -> bool:
acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT,
ttl=EXPORT_CACHE_LOCK_TTL,
):
parsed_filename = ExportCacheManager.parse_file_path(file_path)
parsed_filename = ExportCacheManager.parse_filename(file_path.name)
cache_ttl = get_export_cache_ttl(parsed_filename.instance_type)

if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds():
if timezone.now().timestamp() <= file_path.stat().st_mtime + cache_ttl.total_seconds():
logger.debug(f"Export cache file {file_path.name!r} was recently accessed")
return False

Expand All @@ -69,39 +67,19 @@ def clear_export_cache(file_path: Path) -> bool:
return True


def remove_tmp_dir(dir_path: str) -> bool:
# we do not use locks here when handling a temporary directories
# because undesired race conditions are not possible here:
# 1. A temporary directory can be removed while parsing its name or checking the last modification date.
# In that case an exception is expected and will be handled by the cron job.
# 2. A temporary directory can be removed by a worker only when it is outdated.
# 3. Each temporary directory has a unique name, so the race condition when one process is creating a directory
# and another is removing it - impossible.
parsed = TmpDirManager.parse_tmp_child(dir_path)
assert parsed.operation == OperationType.EXPORT
assert parsed.type == TmpEntityType.DIR
cache_ttl = get_export_cache_ttl(parsed.instance_type)

if timezone.now().timestamp() <= osp.getmtime(dir_path) + cache_ttl.total_seconds():
return False

shutil.rmtree(dir_path)
logger.debug(f"Temporary directory {dir_path} was successfully removed")
return True


class BaseCleanupThread(Thread, metaclass=ABCMeta):
description: ClassVar[str]

def __init__(self, stop_event: Event, *args, **kwargs) -> None:
self._stop_event = stop_event
self._removed_entities = 0
self._number_of_removed_objects = 0
self._exception = None
super().__init__(*args, **kwargs, target=self._cleanup)

@property
def removed_entities(self) -> int:
return self._removed_entities
def number_of_removed_objects(self) -> int:
return self._number_of_removed_objects

@abstractmethod
def _cleanup(self) -> None: ...
Expand All @@ -120,18 +98,33 @@ class CleanupTmpDirThread(BaseCleanupThread):

@suppress_exceptions
def _cleanup(self) -> None:
for dir_path in TmpDirManager.get_export_related_dirs():
# we do not use locks here when handling objects from tmp directory
# because undesired race conditions are not possible here:
# 1. A temporary file/directory can be removed while checking access time.
# In that case an exception is expected and is handled by the cron process.
# 2. A temporary file/directory can be removed by the cron job only when it is outdated.
# 3. Each temporary file/directory has a unique name, so the race condition when one process is creating an object
# and another is removing it - impossible.
for child in os.scandir(TmpDirManager.TMP_ROOT):
# stop clean up process correctly before rq job timeout is ended
if self._stop_event.is_set():
return

try:
if remove_tmp_dir(dir_path):
self._removed_entities += 1
except CacheFileOrDirPathParseError:
logger.warning(f"Cannot parse {dir_path.name}, skipping...")
if (
child.stat().st_atime + timedelta(
days=TmpDirManager.TMP_FILE_OR_DIR_RETENTION_DAYS
).total_seconds() < timezone.now().timestamp()
):
if child.is_dir():
shutil.rmtree(child.path)
else:
os.remove(child.path)
logger.debug(f"The {child.name} was successfully removed")
self._number_of_removed_objects += 1
except FileNotFoundError:
# file or directory has been removed by another process
continue

except Exception:
log_exception(logger)

Expand All @@ -156,7 +149,7 @@ def _cleanup(self) -> None:

try:
if clear_export_cache(child):
self._removed_entities += 1
self._number_of_removed_objects += 1
except CacheFileOrDirPathParseError:
logger.warning(f"Cannot parse {child.name}, skipping...")
continue
Expand Down Expand Up @@ -196,5 +189,5 @@ def cleanup(thread_class_path: str) -> None:
logger.info(
f"The {cleanup_thread.description!r} process has been successfully "
f"completed after {int((finished_at - started_at).total_seconds())} seconds. "
f"{cleanup_thread.removed_entities} elements have been removed"
f"{cleanup_thread.number_of_removed_objects} elements have been removed"
)
5 changes: 2 additions & 3 deletions cvat/apps/dataset_manager/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError
from django.conf import settings
from django.db import transaction
from django.utils import timezone

from cvat.apps.dataset_manager.task import TaskAnnotation
from cvat.apps.dataset_manager.util import TmpDirManager
Expand Down Expand Up @@ -156,7 +155,7 @@ def export(
)

with (
TmpDirManager.get_tmp_export_dir(
TmpDirManager.get_tmp_directory_for_export(
instance_type=self.db_project.__class__.__name__,
) if not temp_dir else nullcontext(temp_dir)
) as temp_dir:
Expand All @@ -174,7 +173,7 @@ def import_dataset(self, dataset_file, importer, **options):
)
project_data.soft_attribute_import = True

with TmpDirManager.get_tmp_dir() as temp_dir:
with TmpDirManager.get_tmp_directory() as temp_dir:
try:
importer(dataset_file, temp_dir, project_data, load_data_callback=self.load_dataset_data, **options)
except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found:
Expand Down
9 changes: 4 additions & 5 deletions cvat/apps/dataset_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from django.conf import settings
from django.db import transaction
from django.db.models.query import Prefetch, QuerySet
from django.utils import timezone
from rest_framework.exceptions import ValidationError

from cvat.apps.dataset_manager.annotation import AnnotationIR, AnnotationManager
Expand Down Expand Up @@ -786,7 +785,7 @@ def export(
)

with (
TmpDirManager.get_tmp_export_dir(
TmpDirManager.get_tmp_directory_for_export(
instance_type=self.db_job.__class__.__name__,
) if not temp_dir else nullcontext(temp_dir)
) as temp_dir:
Expand All @@ -800,7 +799,7 @@ def import_annotations(self, src_file, importer, **options):
)
self.delete()

with TmpDirManager.get_tmp_dir() as temp_dir:
with TmpDirManager.get_tmp_directory() as temp_dir:
try:
importer(src_file, temp_dir, job_data, **options)
except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found:
Expand Down Expand Up @@ -1001,7 +1000,7 @@ def export(
)

with (
TmpDirManager.get_tmp_export_dir(
TmpDirManager.get_tmp_directory_for_export(
instance_type=self.db_task.__class__.__name__,
) if not temp_dir else nullcontext(temp_dir)
) as temp_dir:
Expand All @@ -1015,7 +1014,7 @@ def import_annotations(self, src_file, importer, **options):
)
self.delete()

with TmpDirManager.get_tmp_dir() as temp_dir:
with TmpDirManager.get_tmp_directory() as temp_dir:
try:
importer(src_file, temp_dir, task_data, **options)
except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found:
Expand Down
15 changes: 8 additions & 7 deletions cvat/apps/dataset_manager/tests/test_rest_api_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Any, Callable, ClassVar, Optional, overload
from unittest.mock import DEFAULT as MOCK_DEFAULT
from unittest.mock import MagicMock, patch
from pathlib import Path

import av
import numpy as np
Expand Down Expand Up @@ -1520,7 +1521,7 @@ def _clear(*_, file_path: str):
side_effect(set_condition, clear_removed_the_file),
)

clear_export_cache(file_path=file_path)
clear_export_cache(file_path=Path(file_path))
set_condition(clear_has_been_finished)

mock_os_remove.assert_not_called()
Expand Down Expand Up @@ -1699,7 +1700,7 @@ def _clear(*_, file_path: str):

exited_by_timeout = False
try:
clear_export_cache(file_path=file_path)
clear_export_cache(file_path=Path(file_path))
except LockNotAvailableError:
# should come from waiting for get_export_cache_lock
exited_by_timeout = True
Expand Down Expand Up @@ -2027,15 +2028,15 @@ def test_cleanup_can_remove_file(self):
patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}),
):
export_path = export(dst_format=format_name, task_id=task_id)
clear_export_cache(file_path=export_path)
clear_export_cache(file_path=Path(export_path))

self.assertFalse(osp.isfile(export_path))


def test_cleanup_can_fail_if_no_file(self):
from cvat.apps.dataset_manager.util import CacheFileOrDirPathParseError
with self.assertRaises(CacheFileOrDirPathParseError):
clear_export_cache(file_path="non existent file path")
clear_export_cache(file_path=Path("non existent file path"))

def test_cleanup_can_defer_removal_if_file_is_used_recently(self):
from os import remove as original_remove
Expand All @@ -2050,13 +2051,13 @@ def test_cleanup_can_defer_removal_if_file_is_used_recently(self):
patch("cvat.apps.dataset_manager.cron.os.remove", side_effect=original_remove) as mock_os_remove,
):
export_path = export(dst_format=format_name, task_id=task_id)
clear_export_cache(file_path=export_path)
clear_export_cache(file_path=Path(export_path))
mock_os_remove.assert_not_called()

self.assertTrue(osp.isfile(export_path))

def test_cleanup_cron_job_can_delete_cached_files(self):
from cvat.apps.dataset_manager.cron import cron_export_cache_cleanup
from cvat.apps.dataset_manager.cron import cleanup

def _get_project_task_job_ids():
project = self._create_project(projects["main"])
Expand Down Expand Up @@ -2100,7 +2101,7 @@ def _get_project_task_job_ids():
):
mock_rq_job = MagicMock(timeout=100)
mock_rq_get_current_job.return_value = mock_rq_job
cron_export_cache_cleanup()
cleanup('cvat.apps.dataset_manager.cron.CleanupExportCacheThread')
mock_clear_export_cache.assert_called_once()

self.assertFalse(osp.exists(export_path))
Expand Down
Loading

0 comments on commit 476c688

Please sign in to comment.