Skip to content

Commit

Permalink
Move export cache cleaning into the cron jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Dec 9, 2024
1 parent 094c62d commit 4723737
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 52 deletions.
2 changes: 2 additions & 0 deletions cvat/apps/dataset_manager/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@

DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60))
"Retry interval for cases the export cache lock was unavailable, in seconds"

EXPORT_CACHE_DIR_NAME = "export_cache"
15 changes: 1 addition & 14 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,19 +146,6 @@ def get_export_cache_lock(
lock.release()


EXPORT_CACHE_DIR_NAME = 'export_cache'


def get_export_cache_dir(db_instance: Project | Task | Job) -> str:
base_dir = osp.abspath(db_instance.get_dirname())

if osp.isdir(base_dir):
return osp.join(base_dir, EXPORT_CACHE_DIR_NAME)
else:
raise FileNotFoundError(
'{} dir {} does not exist'.format(db_instance.__class__.__name__, base_dir)
)


def make_export_filename(
dst_dir: str,
Expand Down Expand Up @@ -207,7 +194,7 @@ def parse_export_file_path(file_path: os.PathLike[str]) -> ParsedExportFilename:
if not basename_match:
raise ValueError(f"Couldn't parse filename components in '{basename}'")

dirname_match = re.search(rf'/(jobs|tasks|projects)/\d+/{EXPORT_CACHE_DIR_NAME}$', dirname)
dirname_match = re.search(rf'/(jobs|tasks|projects)/\d+/{settings.EXPORT_CACHE_DIR_NAME}$', dirname)
if not dirname_match:
raise ValueError(f"Couldn't parse instance type in '{dirname}'")

Expand Down
62 changes: 41 additions & 21 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
from django.conf import settings
from django.utils import timezone
from rq_scheduler import Scheduler
from pathlib import Path
from contextlib import suppress

import cvat.apps.dataset_manager.project as project
import cvat.apps.dataset_manager.task as task
Expand All @@ -25,10 +27,9 @@
from .util import (
LockNotAvailableError,
current_function_name, get_export_cache_lock,
get_export_cache_dir, make_export_filename,
make_export_filename,
parse_export_file_path
)
from .util import EXPORT_CACHE_DIR_NAME # pylint: disable=unused-import

slogger = ServerLogManager(__name__)

Expand Down Expand Up @@ -112,8 +113,7 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
db_instance = Job.objects.get(pk=job_id)

cache_ttl = get_export_cache_ttl(db_instance)

cache_dir = get_export_cache_dir(db_instance)
cache_dir = db_instance.get_export_cache_directory()

# As we're not locking the db object here, it can be updated by the time of actual export.
# The file will be saved with the older timestamp.
Expand Down Expand Up @@ -205,48 +205,68 @@ def export_project_annotations(project_id, dst_format=None, server_url=None):
class FileIsBeingUsedError(Exception):
pass

def clear_export_cache(file_path: str, file_ctime: float, logger: logging.Logger) -> None:
# file_ctime is for backward compatibility with older RQ jobs, not needed now

# TODO: write a migration to delete all clear_export_cache scheduled jobs from scheduler
def clear_export_cache(file_path: str, logger: logging.Logger) -> None:
try:
# TODO: update after 8721
with get_export_cache_lock(
file_path,
block=True,
acquire_timeout=EXPORT_CACHE_LOCK_TIMEOUT,
ttl=rq.get_current_job().timeout,
):
if not osp.exists(file_path):
raise FileNotFoundError("Export cache file '{}' doesn't exist".format(file_path))
logger.error("Export cache file '{}' doesn't exist".format(file_path))

parsed_filename = parse_export_file_path(file_path)
cache_ttl = get_export_cache_ttl(parsed_filename.instance_type)

if timezone.now().timestamp() <= osp.getmtime(file_path) + cache_ttl.total_seconds():
# Need to retry later, the export is in use
_retry_current_rq_job(cache_ttl)
logger.info(
"Export cache file '{}' is recently accessed, will retry in {}".format(
file_path, cache_ttl
)
"Export cache file '{}' is recently accessed".format(file_path)
)
raise FileIsBeingUsedError # should be handled by the worker
raise FileIsBeingUsedError

# TODO: maybe remove all outdated exports
os.remove(file_path)
logger.info("Export cache file '{}' successfully removed".format(file_path))
logger.debug("Export cache file '{}' successfully removed".format(file_path))
except LockNotAvailableError:
# Need to retry later if the lock was not available
_retry_current_rq_job(EXPORT_LOCKED_RETRY_INTERVAL)
logger.info(
"Failed to acquire export cache lock. Retrying in {}".format(
EXPORT_LOCKED_RETRY_INTERVAL
)
"Failed to acquire export cache lock for the file: {file_path}."
)
raise
except Exception:
log_exception(logger)
raise


def cron_job_to_clear_export_cache(Model: str) -> None:
import importlib
assert isinstance(Model, str)

module_name, Model = Model.rsplit('.', 1)
module = importlib.import_module(module_name)
Model = getattr(module, Model)

logger = ServerLogManager(__name__).glob

one_month_ago = timezone.now() - timedelta(days=30)
queryset = Model.objects.filter(last_export_date__gte=one_month_ago)

for instance in queryset:
try:
export_cache_dir_path = Path(instance.get_export_cache_directory())
except FileNotFoundError as ex:
logger.warning(str(ex))
continue

for child in export_cache_dir_path.iterdir():
if not child.is_file():
logger.exception(f'Unexpected file found in export cache: {child.name}')
continue

with suppress(Exception):
clear_export_cache(child, logger)

def get_export_formats():
return list(EXPORT_FORMATS.values())

Expand Down
3 changes: 3 additions & 0 deletions cvat/apps/engine/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,9 @@ def setup_background_job(
db_storage = None
result_url = self.make_result_url()

# TODO: move into worker?
self.db_instance.touch_last_export_date()

with get_rq_lock_by_user(queue, user_id):
queue.enqueue_call(
func=func,
Expand Down
6 changes: 3 additions & 3 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from cvat.apps.engine.cloud_provider import import_resource_from_cloud_storage
from cvat.apps.engine.location import StorageType, get_location_configuration
from cvat.apps.engine.permissions import get_cloud_storage_for_import_or_export
from cvat.apps.dataset_manager.views import get_export_cache_dir, log_exception
from cvat.apps.dataset_manager.views import log_exception
from cvat.apps.dataset_manager.bindings import CvatImportError

slogger = ServerLogManager(__name__)
Expand Down Expand Up @@ -1015,9 +1015,9 @@ def _import_project(filename, user, org_id):
db_project = project_importer.import_project()
return db_project.id

def create_backup(db_instance, Exporter, output_path, logger, cache_ttl):
def create_backup(db_instance: models.Project | models.Task, Exporter, output_path, logger, cache_ttl):
try:
cache_dir = get_export_cache_dir(db_instance)
cache_dir = db_instance.get_export_cache_directory()
output_path = os.path.join(cache_dir, output_path)

instance_time = timezone.localtime(db_instance.updated_date).timestamp()
Expand Down
2 changes: 2 additions & 0 deletions cvat/apps/engine/management/commands/syncperiodicjobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ def handle(self, *args, **options):
cron_string=job_definition['cron_string'],
func=job_definition['func'],
id=job_id,
args=job_definition.get('args'),
kwargs=job_definition.get('kwargs'),
)

queue.connection.sadd(periodic_jobs_key, job_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.15 on 2024-12-09 16:51

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("engine", "0086_profile_has_analytics_access"),
]

operations = [
migrations.AddField(
model_name="job",
name="last_export_date",
field=models.DateTimeField(null=True),
),
migrations.AddField(
model_name="project",
name="last_export_date",
field=models.DateTimeField(null=True),
),
migrations.AddField(
model_name="task",
name="last_export_date",
field=models.DateTimeField(null=True),
),
]
61 changes: 47 additions & 14 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import re
import shutil
import uuid
from abc import ABCMeta, abstractmethod
from enum import Enum
from functools import cached_property
from typing import Any, ClassVar, Collection, Dict, Optional
Expand All @@ -21,7 +22,9 @@
from django.db import IntegrityError, models, transaction
from django.db.models import Q, TextChoices
from django.db.models.fields import FloatField
from django.db.models.base import ModelBase
from django.utils.translation import gettext_lazy as _
from django.utils import timezone
from drf_spectacular.types import OpenApiTypes
from drf_spectacular.utils import extend_schema_field

Expand Down Expand Up @@ -424,6 +427,30 @@ class Meta:
def touch(self) -> None:
self.save(update_fields=["updated_date"])

class ABCModelMeta(ABCMeta, ModelBase):
pass

class _FileSystemRelatedModel(models.Model, metaclass=ABCModelMeta):
class Meta:
abstract = True

@abstractmethod
def get_dirname(self) -> str:
...

def get_tmp_dirname(self) -> str:
return os.path.join(self.get_dirname(), "tmp")

def get_export_cache_directory(self) -> str:
base_dir = os.path.abspath(self.get_dirname())

if os.path.isdir(base_dir):
return os.path.join(base_dir, settings.EXPORT_CACHE_DIR_NAME)

raise FileNotFoundError(
'{self.__class__.__name__}: dir {base_dir} does not exist'
)

@transaction.atomic(savepoint=False)
def clear_annotations_in_jobs(job_ids):
for job_ids_chunk in chunked_list(job_ids, chunk_size=1000):
Expand All @@ -436,7 +463,7 @@ def clear_annotations_in_jobs(job_ids):
LabeledImageAttributeVal.objects.filter(image__job_id__in=job_ids_chunk).delete()
LabeledImage.objects.filter(job_id__in=job_ids_chunk).delete()

class Project(TimestampedModel):
class Project(TimestampedModel, _FileSystemRelatedModel):
name = SafeCharField(max_length=256)
owner = models.ForeignKey(User, null=True, blank=True,
on_delete=models.SET_NULL, related_name="+")
Expand All @@ -454,6 +481,12 @@ class Project(TimestampedModel):
target_storage = models.ForeignKey('Storage', null=True, default=None,
blank=True, on_delete=models.SET_NULL, related_name='+')

last_export_date = models.DateTimeField(null=True)

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_labels(self, prefetch=False):
queryset = self.label_set.filter(parent__isnull=True).select_related('skeleton')
return queryset.prefetch_related(
Expand All @@ -463,9 +496,6 @@ def get_labels(self, prefetch=False):
def get_dirname(self):
return os.path.join(settings.PROJECTS_ROOT, str(self.id))

def get_tmp_dirname(self):
return os.path.join(self.get_dirname(), "tmp")

def is_job_staff(self, user_id):
if self.owner == user_id:
return True
Expand Down Expand Up @@ -514,7 +544,7 @@ def with_job_summary(self):
)
)

class Task(TimestampedModel):
class Task(TimestampedModel, _FileSystemRelatedModel):
objects = TaskQuerySet.as_manager()

project = models.ForeignKey(Project, on_delete=models.CASCADE,
Expand Down Expand Up @@ -543,13 +573,18 @@ class Task(TimestampedModel):
blank=True, on_delete=models.SET_NULL, related_name='+')
target_storage = models.ForeignKey('Storage', null=True, default=None,
blank=True, on_delete=models.SET_NULL, related_name='+')
last_export_date = models.DateTimeField(null=True)

segment_set: models.manager.RelatedManager[Segment]

# Extend default permission model
class Meta:
default_permissions = ()

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_labels(self, prefetch=False):
project = self.project
if project:
Expand All @@ -560,12 +595,9 @@ def get_labels(self, prefetch=False):
'attributespec_set', 'sublabels__attributespec_set',
) if prefetch else queryset

def get_dirname(self):
def get_dirname(self) -> str:
return os.path.join(settings.TASKS_ROOT, str(self.id))

def get_tmp_dirname(self):
return os.path.join(self.get_dirname(), "tmp")

def is_job_staff(self, user_id):
if self.owner == user_id:
return True
Expand Down Expand Up @@ -808,7 +840,7 @@ def _validate_constraints(self, obj: Dict[str, Any]):



class Job(TimestampedModel):
class Job(TimestampedModel, _FileSystemRelatedModel):
objects = JobQuerySet.as_manager()

segment = models.ForeignKey(Segment, on_delete=models.CASCADE)
Expand All @@ -826,9 +858,13 @@ class Job(TimestampedModel):
default=StageChoice.ANNOTATION)
state = models.CharField(max_length=32, choices=StateChoice.choices(),
default=StateChoice.NEW)

type = models.CharField(max_length=32, choices=JobType.choices(),
default=JobType.ANNOTATION)
last_export_date = models.DateTimeField(null=True)

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_target_storage(self) -> Optional[Storage]:
return self.segment.task.target_storage
Expand All @@ -839,9 +875,6 @@ def get_source_storage(self) -> Optional[Storage]:
def get_dirname(self):
return os.path.join(settings.JOBS_ROOT, str(self.id))

def get_tmp_dirname(self):
return os.path.join(self.get_dirname(), 'tmp')

@extend_schema_field(OpenApiTypes.INT)
def get_project_id(self):
project = self.segment.task.project
Expand Down
12 changes: 12 additions & 0 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,18 @@ class CVAT_QUEUES(Enum):
'func': 'cvat.apps.iam.utils.clean_up_sessions',
'cron_string': '0 0 * * *',
},
*(
{
'queue': CVAT_QUEUES.CLEANING.value,
'id': f'clear_{model.lower()}_export_cache',
'func': 'cvat.apps.dataset_manager.views.cron_job_to_clear_export_cache',
# Run once a day at midnight
'cron_string': '0 0 * * *',
# 'cron_string': '05 17 * * *',
'args': (f'cvat.apps.engine.models.{model.title()}',),
}
for model in ('project', 'task', 'job')
),
]

# JavaScript and CSS compression
Expand Down

0 comments on commit 4723737

Please sign in to comment.