From 476c68872b40710f5ac9667aee30f3413358caa9 Mon Sep 17 00:00:00 2001 From: Maria Khrustaleva Date: Thu, 9 Jan 2025 11:29:46 +0100 Subject: [PATCH] Refactor a bit --- ...41224_150942_maria_clear_cache_cron_job.md | 8 ++ cvat/apps/dataset_manager/cron.py | 65 +++++++--------- cvat/apps/dataset_manager/project.py | 5 +- cvat/apps/dataset_manager/task.py | 9 +-- .../tests/test_rest_api_formats.py | 15 ++-- cvat/apps/dataset_manager/util.py | 77 +++---------------- cvat/apps/dataset_manager/views.py | 2 +- cvat/apps/engine/backup.py | 2 +- cvat/apps/engine/tests/test_rest_api.py | 6 +- cvat/settings/base.py | 5 +- dev/format_python_code.sh | 1 + 11 files changed, 72 insertions(+), 123 deletions(-) diff --git a/changelog.d/20241224_150942_maria_clear_cache_cron_job.md b/changelog.d/20241224_150942_maria_clear_cache_cron_job.md index 52ab9046fc97..e56e1debfe9e 100644 --- a/changelog.d/20241224_150942_maria_clear_cache_cron_job.md +++ b/changelog.d/20241224_150942_maria_clear_cache_cron_job.md @@ -1,3 +1,11 @@ +### Added + +- Setting `TMP_FILE_OR_DIR_RETENTION_DAYS`, which defines maximum retention period + of a file or dir in temporary directory + () +- Cron job to remove outdated files and directories from CVAT tmp directory + () + ### Changed - Export cache cleaning moved to a separate cron job diff --git a/cvat/apps/dataset_manager/cron.py b/cvat/apps/dataset_manager/cron.py index 11327f5f9338..fffb4136a39c 100644 --- a/cvat/apps/dataset_manager/cron.py +++ b/cvat/apps/dataset_manager/cron.py @@ -1,4 +1,4 @@ -# Copyright (C) 2024 CVAT.ai Corporation +# Copyright (C) 2025 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -23,9 +23,7 @@ from cvat.apps.dataset_manager.util import ( CacheFileOrDirPathParseError, ExportCacheManager, - OperationType, TmpDirManager, - TmpEntityType, get_export_cache_lock, ) from cvat.apps.dataset_manager.views import ( @@ -57,10 +55,10 @@ def clear_export_cache(file_path: Path) -> bool: acquire_timeout=EXPORT_CACHE_LOCK_ACQUISITION_TIMEOUT, ttl=EXPORT_CACHE_LOCK_TTL, ): - parsed_filename = ExportCacheManager.parse_file_path(file_path) + parsed_filename = ExportCacheManager.parse_filename(file_path.name) cache_ttl = get_export_cache_ttl(parsed_filename.instance_type) - if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds(): + if timezone.now().timestamp() <= file_path.stat().st_mtime + cache_ttl.total_seconds(): logger.debug(f"Export cache file {file_path.name!r} was recently accessed") return False @@ -69,39 +67,19 @@ def clear_export_cache(file_path: Path) -> bool: return True -def remove_tmp_dir(dir_path: str) -> bool: - # we do not use locks here when handling a temporary directories - # because undesired race conditions are not possible here: - # 1. A temporary directory can be removed while parsing its name or checking the last modification date. - # In that case an exception is expected and will be handled by the cron job. - # 2. A temporary directory can be removed by a worker only when it is outdated. - # 3. Each temporary directory has a unique name, so the race condition when one process is creating a directory - # and another is removing it - impossible. - parsed = TmpDirManager.parse_tmp_child(dir_path) - assert parsed.operation == OperationType.EXPORT - assert parsed.type == TmpEntityType.DIR - cache_ttl = get_export_cache_ttl(parsed.instance_type) - - if timezone.now().timestamp() <= osp.getmtime(dir_path) + cache_ttl.total_seconds(): - return False - - shutil.rmtree(dir_path) - logger.debug(f"Temporary directory {dir_path} was successfully removed") - return True - class BaseCleanupThread(Thread, metaclass=ABCMeta): description: ClassVar[str] def __init__(self, stop_event: Event, *args, **kwargs) -> None: self._stop_event = stop_event - self._removed_entities = 0 + self._number_of_removed_objects = 0 self._exception = None super().__init__(*args, **kwargs, target=self._cleanup) @property - def removed_entities(self) -> int: - return self._removed_entities + def number_of_removed_objects(self) -> int: + return self._number_of_removed_objects @abstractmethod def _cleanup(self) -> None: ... @@ -120,18 +98,33 @@ class CleanupTmpDirThread(BaseCleanupThread): @suppress_exceptions def _cleanup(self) -> None: - for dir_path in TmpDirManager.get_export_related_dirs(): + # we do not use locks here when handling objects from tmp directory + # because undesired race conditions are not possible here: + # 1. A temporary file/directory can be removed while checking access time. + # In that case an exception is expected and is handled by the cron process. + # 2. A temporary file/directory can be removed by the cron job only when it is outdated. + # 3. Each temporary file/directory has a unique name, so the race condition when one process is creating an object + # and another is removing it - impossible. + for child in os.scandir(TmpDirManager.TMP_ROOT): # stop clean up process correctly before rq job timeout is ended if self._stop_event.is_set(): return try: - if remove_tmp_dir(dir_path): - self._removed_entities += 1 - except CacheFileOrDirPathParseError: - logger.warning(f"Cannot parse {dir_path.name}, skipping...") + if ( + child.stat().st_atime + timedelta( + days=TmpDirManager.TMP_FILE_OR_DIR_RETENTION_DAYS + ).total_seconds() < timezone.now().timestamp() + ): + if child.is_dir(): + shutil.rmtree(child.path) + else: + os.remove(child.path) + logger.debug(f"The {child.name} was successfully removed") + self._number_of_removed_objects += 1 + except FileNotFoundError: + # file or directory has been removed by another process continue - except Exception: log_exception(logger) @@ -156,7 +149,7 @@ def _cleanup(self) -> None: try: if clear_export_cache(child): - self._removed_entities += 1 + self._number_of_removed_objects += 1 except CacheFileOrDirPathParseError: logger.warning(f"Cannot parse {child.name}, skipping...") continue @@ -196,5 +189,5 @@ def cleanup(thread_class_path: str) -> None: logger.info( f"The {cleanup_thread.description!r} process has been successfully " f"completed after {int((finished_at - started_at).total_seconds())} seconds. " - f"{cleanup_thread.removed_entities} elements have been removed" + f"{cleanup_thread.number_of_removed_objects} elements have been removed" ) diff --git a/cvat/apps/dataset_manager/project.py b/cvat/apps/dataset_manager/project.py index c72ab380a58a..8f91e4f12651 100644 --- a/cvat/apps/dataset_manager/project.py +++ b/cvat/apps/dataset_manager/project.py @@ -12,7 +12,6 @@ from datumaro.components.errors import DatasetError, DatasetImportError, DatasetNotFoundError from django.conf import settings from django.db import transaction -from django.utils import timezone from cvat.apps.dataset_manager.task import TaskAnnotation from cvat.apps.dataset_manager.util import TmpDirManager @@ -156,7 +155,7 @@ def export( ) with ( - TmpDirManager.get_tmp_export_dir( + TmpDirManager.get_tmp_directory_for_export( instance_type=self.db_project.__class__.__name__, ) if not temp_dir else nullcontext(temp_dir) ) as temp_dir: @@ -174,7 +173,7 @@ def import_dataset(self, dataset_file, importer, **options): ) project_data.soft_attribute_import = True - with TmpDirManager.get_tmp_dir() as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(dataset_file, temp_dir, project_data, load_data_callback=self.load_dataset_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: diff --git a/cvat/apps/dataset_manager/task.py b/cvat/apps/dataset_manager/task.py index 9c614a4c5003..ebc007dbb898 100644 --- a/cvat/apps/dataset_manager/task.py +++ b/cvat/apps/dataset_manager/task.py @@ -15,7 +15,6 @@ from django.conf import settings from django.db import transaction from django.db.models.query import Prefetch, QuerySet -from django.utils import timezone from rest_framework.exceptions import ValidationError from cvat.apps.dataset_manager.annotation import AnnotationIR, AnnotationManager @@ -786,7 +785,7 @@ def export( ) with ( - TmpDirManager.get_tmp_export_dir( + TmpDirManager.get_tmp_directory_for_export( instance_type=self.db_job.__class__.__name__, ) if not temp_dir else nullcontext(temp_dir) ) as temp_dir: @@ -800,7 +799,7 @@ def import_annotations(self, src_file, importer, **options): ) self.delete() - with TmpDirManager.get_tmp_dir() as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(src_file, temp_dir, job_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: @@ -1001,7 +1000,7 @@ def export( ) with ( - TmpDirManager.get_tmp_export_dir( + TmpDirManager.get_tmp_directory_for_export( instance_type=self.db_task.__class__.__name__, ) if not temp_dir else nullcontext(temp_dir) ) as temp_dir: @@ -1015,7 +1014,7 @@ def import_annotations(self, src_file, importer, **options): ) self.delete() - with TmpDirManager.get_tmp_dir() as temp_dir: + with TmpDirManager.get_tmp_directory() as temp_dir: try: importer(src_file, temp_dir, task_data, **options) except (DatasetNotFoundError, CvatDatasetNotFoundError) as not_found: diff --git a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py index 5eec5ed9bdac..50817e674ad9 100644 --- a/cvat/apps/dataset_manager/tests/test_rest_api_formats.py +++ b/cvat/apps/dataset_manager/tests/test_rest_api_formats.py @@ -21,6 +21,7 @@ from typing import Any, Callable, ClassVar, Optional, overload from unittest.mock import DEFAULT as MOCK_DEFAULT from unittest.mock import MagicMock, patch +from pathlib import Path import av import numpy as np @@ -1520,7 +1521,7 @@ def _clear(*_, file_path: str): side_effect(set_condition, clear_removed_the_file), ) - clear_export_cache(file_path=file_path) + clear_export_cache(file_path=Path(file_path)) set_condition(clear_has_been_finished) mock_os_remove.assert_not_called() @@ -1699,7 +1700,7 @@ def _clear(*_, file_path: str): exited_by_timeout = False try: - clear_export_cache(file_path=file_path) + clear_export_cache(file_path=Path(file_path)) except LockNotAvailableError: # should come from waiting for get_export_cache_lock exited_by_timeout = True @@ -2027,7 +2028,7 @@ def test_cleanup_can_remove_file(self): patch("cvat.apps.dataset_manager.views.TTL_CONSTS", new={"task": timedelta(seconds=0)}), ): export_path = export(dst_format=format_name, task_id=task_id) - clear_export_cache(file_path=export_path) + clear_export_cache(file_path=Path(export_path)) self.assertFalse(osp.isfile(export_path)) @@ -2035,7 +2036,7 @@ def test_cleanup_can_remove_file(self): def test_cleanup_can_fail_if_no_file(self): from cvat.apps.dataset_manager.util import CacheFileOrDirPathParseError with self.assertRaises(CacheFileOrDirPathParseError): - clear_export_cache(file_path="non existent file path") + clear_export_cache(file_path=Path("non existent file path")) def test_cleanup_can_defer_removal_if_file_is_used_recently(self): from os import remove as original_remove @@ -2050,13 +2051,13 @@ def test_cleanup_can_defer_removal_if_file_is_used_recently(self): patch("cvat.apps.dataset_manager.cron.os.remove", side_effect=original_remove) as mock_os_remove, ): export_path = export(dst_format=format_name, task_id=task_id) - clear_export_cache(file_path=export_path) + clear_export_cache(file_path=Path(export_path)) mock_os_remove.assert_not_called() self.assertTrue(osp.isfile(export_path)) def test_cleanup_cron_job_can_delete_cached_files(self): - from cvat.apps.dataset_manager.cron import cron_export_cache_cleanup + from cvat.apps.dataset_manager.cron import cleanup def _get_project_task_job_ids(): project = self._create_project(projects["main"]) @@ -2100,7 +2101,7 @@ def _get_project_task_job_ids(): ): mock_rq_job = MagicMock(timeout=100) mock_rq_get_current_job.return_value = mock_rq_job - cron_export_cache_cleanup() + cleanup('cvat.apps.dataset_manager.cron.CleanupExportCacheThread') mock_clear_export_cache.assert_called_once() self.assertFalse(osp.exists(export_path)) diff --git a/cvat/apps/dataset_manager/util.py b/cvat/apps/dataset_manager/util.py index 5e04e6a70a0a..ef2ef26f75d8 100644 --- a/cvat/apps/dataset_manager/util.py +++ b/cvat/apps/dataset_manager/util.py @@ -1,5 +1,5 @@ # Copyright (C) 2019-2022 Intel Corporation -# Copyright (C) 2023-2024 CVAT.ai Corporation +# Copyright (C) 2023-2025 CVAT.ai Corporation # # SPDX-License-Identifier: MIT @@ -14,7 +14,6 @@ from copy import deepcopy from datetime import timedelta from enum import Enum -from pathlib import Path from threading import Lock from typing import Any @@ -157,13 +156,11 @@ def get_export_cache_lock( class OperationType(str, Enum): EXPORT = "export" - IMPORT = "import" @classmethod def values(cls) -> list[str]: return list(map(lambda x: x.value, cls)) -# todo: rename class ExportFileType(str, Enum): ANNOTATIONS = "annotations" BACKUP = "backup" @@ -200,43 +197,25 @@ class ParsedDatasetFilename(_ParsedExportFilename): class ParsedBackupFilename(_ParsedExportFilename): pass -class TmpEntityType(str, Enum): - DIR = "dir" - FILE = "file" - -@attrs.frozen -class ParsedTmpEntity: - instance_type: InstanceType = attrs.field(converter=InstanceType) - operation: OperationType = attrs.field(converter=OperationType) - -@attrs.frozen -class ParsedTmpDir(ParsedTmpEntity): - type: TmpEntityType = attrs.field(init=False, default=TmpEntityType.DIR) - -@attrs.frozen -class ParsedTmpFile(ParsedTmpEntity): - type: TmpEntityType = attrs.field(init=False, default=TmpEntityType.FILE) - class TmpDirManager: SPLITTER = "-" TMP_ROOT = settings.TMP_FILES_ROOT - - @classmethod - def get_export_related_dirs(cls) -> Generator[Path, Any, Any]: - for item in Path(cls.TMP_ROOT).glob(f"{OperationType.EXPORT}*"): - if item.is_dir(): - yield item + TMP_FILE_OR_DIR_RETENTION_DAYS = settings.TMP_FILE_OR_DIR_RETENTION_DAYS @classmethod @contextmanager - def get_tmp_dir( + def get_tmp_directory( cls, *, prefix: str | None = None, suffix: str | None = None, ignore_cleanup_errors: bool | None = None, ) -> Generator[str, Any, Any]: + """ + The method allows to create a temporary directory and + ensures that the parent directory uses the CVAT tmp directory + """ params = {} for k, v in { "prefix": prefix, @@ -251,45 +230,17 @@ def get_tmp_dir( @classmethod @contextmanager - def get_tmp_export_dir( + def get_tmp_directory_for_export( cls, *, instance_type: str, ) -> Generator[str, Any, Any]: instance_type = InstanceType(instance_type.lower()) - with cls.get_tmp_dir( + with cls.get_tmp_directory( prefix=cls.SPLITTER.join([OperationType.EXPORT, instance_type]) + cls.SPLITTER ) as tmp_dir: yield tmp_dir - @classmethod - def parse_tmp_child(cls, child_path: os.PathLike[str]) -> ParsedTmpDir | ParsedTmpFile: - child_path = Path(osp.normpath(child_path)) - - if child_path.is_dir(): - dir_name = child_path.name - - basename_match = re.match( - ( - rf"^(?P{'|'.join(OperationType.values())}){cls.SPLITTER}" - rf"(?P{'|'.join(InstanceType.values())}){cls.SPLITTER}" - ), - dir_name, - ) - - if not basename_match: - raise CacheFileOrDirPathParseError(f"Couldn't parse directory name: {dir_name!r}") - - try: - parsed_dir_name = ParsedTmpDir( - **basename_match.groupdict() - ) - except ValueError as ex: - raise CacheFileOrDirPathParseError(f"Couldn't parse directory name: {dir_name!r}") from ex - - return parsed_dir_name - - raise NotImplementedError() class ExportCacheManager: SPLITTER = "-" @@ -355,15 +306,11 @@ def make_backup_file_path( return osp.join(settings.EXPORT_CACHE_ROOT, filename) @classmethod - def parse_file_path( - cls, file_path: os.PathLike[str], + def parse_filename( + cls, filename: str, ) -> ParsedDatasetFilename | ParsedBackupFilename: - file_path = osp.normpath(file_path) - basename = osp.split(file_path)[1] - basename, file_ext = osp.splitext(basename) + basename, file_ext = osp.splitext(filename) file_ext = file_ext.strip(".").lower() - - # handle file name basename_match = re.fullmatch( ( rf"^(?P{'|'.join(InstanceType.values())})" diff --git a/cvat/apps/dataset_manager/views.py b/cvat/apps/dataset_manager/views.py index 20c14a73eb5f..2812dd4f3162 100644 --- a/cvat/apps/dataset_manager/views.py +++ b/cvat/apps/dataset_manager/views.py @@ -167,7 +167,7 @@ def export( extend_export_file_lifetime(output_path) return output_path - with TmpDirManager.get_tmp_export_dir(instance_type=instance_type) as temp_dir: + with TmpDirManager.get_tmp_directory_for_export(instance_type=instance_type) as temp_dir: temp_file = osp.join(temp_dir, 'result') export_fn(db_instance.id, temp_file, format_name=dst_format, server_url=server_url, save_images=save_images, temp_dir=temp_dir) diff --git a/cvat/apps/engine/backup.py b/cvat/apps/engine/backup.py index c32c136f248e..0bb9868b5ae3 100644 --- a/cvat/apps/engine/backup.py +++ b/cvat/apps/engine/backup.py @@ -1083,7 +1083,7 @@ def create_backup( extend_export_file_lifetime(output_path) return output_path - with TmpDirManager.get_tmp_export_dir(instance_type=instance_type) as tmp_dir: + with TmpDirManager.get_tmp_directory_for_export(instance_type=instance_type) as tmp_dir: temp_file = os.path.join(tmp_dir, 'dump') exporter = Exporter(db_instance.id) exporter.export_to(temp_file) diff --git a/cvat/apps/engine/tests/test_rest_api.py b/cvat/apps/engine/tests/test_rest_api.py index cb613c60d048..2b477702f8fb 100644 --- a/cvat/apps/engine/tests/test_rest_api.py +++ b/cvat/apps/engine/tests/test_rest_api.py @@ -3108,7 +3108,7 @@ def test_api_v2_tasks_id_export_no_auth(self): self._run_api_v2_tasks_id_export_import(None) def test_can_remove_export_cache_automatically_after_successful_export(self): - from cvat.apps.dataset_manager.cron import clear_export_cache, cron_export_cache_cleanup + from cvat.apps.dataset_manager.cron import clear_export_cache, cleanup self._create_tasks() task_id = self.tasks[0]["id"] user = self.admin @@ -3127,7 +3127,7 @@ def test_can_remove_export_cache_automatically_after_successful_export(self): ): mock_rq_job = mock.MagicMock(timeout=100) mock_rq_get_current_job.return_value = mock_rq_job - cron_export_cache_cleanup() + cleanup("cvat.apps.dataset_manager.cron.CleanupExportCacheThread") mock_clear_export_cache.assert_not_called() response = self._run_api_v2_tasks_id_export(task_id, user) @@ -3148,7 +3148,7 @@ def test_can_remove_export_cache_automatically_after_successful_export(self): mock.patch('cvat.apps.dataset_manager.views.TASK_CACHE_TTL', new=timedelta(seconds=0)), mock.patch('cvat.apps.dataset_manager.views.TTL_CONSTS', new={'task': timedelta(seconds=0)}), ): - cron_export_cache_cleanup() + cleanup("cvat.apps.dataset_manager.cron.CleanupExportCacheThread") mock_clear_export_cache.assert_called_once() self.assertFalse(os.path.exists(file_path)) diff --git a/cvat/settings/base.py b/cvat/settings/base.py index c28da9b9f7aa..73ef6c4f7cc1 100644 --- a/cvat/settings/base.py +++ b/cvat/settings/base.py @@ -359,7 +359,6 @@ class CVAT_QUEUES(Enum): 'func': 'cvat.apps.dataset_manager.cron.cleanup', # Run twice a day (at midnight and at noon) 'cron_string': '0 0,12 * * *', - # 'cron_string': '50 11 * * *', 'args': ('cvat.apps.dataset_manager.cron.CleanupExportCacheThread',), }, { @@ -368,7 +367,6 @@ class CVAT_QUEUES(Enum): 'func': 'cvat.apps.dataset_manager.cron.cleanup', # Run once a day 'cron_string': '0 18 * * *', - # 'cron_string': '17 12 * * *', 'args': ('cvat.apps.dataset_manager.cron.CleanupTmpDirThread',), } ] @@ -766,3 +764,6 @@ class CVAT_QUEUES(Enum): CLOUD_DATA_DOWNLOADING_MAX_THREADS_NUMBER = 4 CLOUD_DATA_DOWNLOADING_NUMBER_OF_FILES_PER_THREAD = 1000 + +# Indicates the maximum number of days a file or directory is retained in the temporary directory +TMP_FILE_OR_DIR_RETENTION_DAYS = 14 diff --git a/dev/format_python_code.sh b/dev/format_python_code.sh index f7220679073c..6d4288ec1b4a 100755 --- a/dev/format_python_code.sh +++ b/dev/format_python_code.sh @@ -36,6 +36,7 @@ for paths in \ "cvat/apps/dataset_manager/tests/utils.py" \ "cvat/apps/events/signals.py" \ "cvat/apps/engine/management/commands/syncperiodicjobs.py" \ + "cvat/apps/dataset_manager/management/commands/exportcachecleanup.py" \ ; do ${BLACK} -- ${paths} ${ISORT} -- ${paths}