Skip to content

Commit

Permalink
Refactor a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
Marishka17 committed Dec 10, 2024
1 parent 4723737 commit f20eadf
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 46 deletions.
2 changes: 1 addition & 1 deletion cvat/apps/dataset_manager/default_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
DATASET_EXPORT_LOCKED_RETRY_INTERVAL = int(os.getenv("CVAT_DATASET_EXPORT_LOCKED_RETRY_INTERVAL", 60))
"Retry interval for cases the export cache lock was unavailable, in seconds"

EXPORT_CACHE_DIR_NAME = "export_cache"
EXPORT_CACHE_DIR_NAME = "export_cache"
2 changes: 0 additions & 2 deletions cvat/apps/dataset_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
from django.db import models
from pottery import Redlock

from cvat.apps.engine.models import Job, Project, Task


def current_function_name(depth=1):
return inspect.getouterframes(inspect.currentframe())[depth].function
Expand Down
37 changes: 24 additions & 13 deletions cvat/apps/dataset_manager/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tempfile
from datetime import timedelta

import importlib
import django_rq
import rq
from django.conf import settings
Expand All @@ -23,6 +24,7 @@
from cvat.apps.engine.models import Job, Project, Task
from cvat.apps.engine.utils import get_rq_lock_by_user

from django.db.models import QuerySet
from .formats.registry import EXPORT_FORMATS, IMPORT_FORMATS
from .util import (
LockNotAvailableError,
Expand Down Expand Up @@ -113,7 +115,7 @@ def export(dst_format, project_id=None, task_id=None, job_id=None, server_url=No
db_instance = Job.objects.get(pk=job_id)

cache_ttl = get_export_cache_ttl(db_instance)
cache_dir = db_instance.get_export_cache_directory()
cache_dir = db_instance.get_export_cache_directory(create=True)

# As we're not locking the db object here, it can be updated by the time of actual export.
# The file will be saved with the older timestamp.
Expand Down Expand Up @@ -239,34 +241,43 @@ def clear_export_cache(file_path: str, logger: logging.Logger) -> None:
raise


def cron_job_to_clear_export_cache(Model: str) -> None:
import importlib
assert isinstance(Model, str)
def cron_export_cache_cleanup(path_to_model: str) -> None:
assert isinstance(path_to_model, str)

module_name, Model = Model.rsplit('.', 1)
started_at = timezone.now()
module_name, model_name = path_to_model.rsplit('.', 1)
module = importlib.import_module(module_name)
Model = getattr(module, Model)
Model = getattr(module, model_name)
assert Model in (Project, Task, Job)

logger = ServerLogManager(__name__).glob

one_month_ago = timezone.now() - timedelta(days=30)
queryset = Model.objects.filter(last_export_date__gte=one_month_ago)
queryset: QuerySet[Project | Task | Job] = Model.objects.filter(last_export_date__gte=one_month_ago)

for instance in queryset:
try:
export_cache_dir_path = Path(instance.get_export_cache_directory())
except FileNotFoundError as ex:
logger.warning(str(ex))
for instance in queryset.iterator():
instance_dir_path = Path(instance.get_dirname())
export_cache_dir_path = Path(instance.get_export_cache_directory())

if not export_cache_dir_path.exists():
logger.debug(f"The {export_cache_dir_path.relative_to(instance_dir_path)} path does not exist, skipping...")
continue

for child in export_cache_dir_path.iterdir():
if not child.is_file():
logger.exception(f'Unexpected file found in export cache: {child.name}')
logger.warning(f'Unexpected file found in export cache: {child.relative_to(instance_dir_path)}')
continue

with suppress(Exception):
clear_export_cache(child, logger)

finished_at = timezone.now()
logger.info(
f"Clearing the {model_name}'s export cache has been successfully "
f"completed after {(finished_at - started_at).total_seconds()} seconds..."
)


def get_export_formats():
return list(EXPORT_FORMATS.values())

Expand Down
1 change: 0 additions & 1 deletion cvat/apps/engine/background.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,6 @@ def setup_background_job(
db_storage = None
result_url = self.make_result_url()

# TODO: move into worker?
self.db_instance.touch_last_export_date()

with get_rq_lock_by_user(queue, user_id):
Expand Down
3 changes: 1 addition & 2 deletions cvat/apps/engine/backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -1017,13 +1017,12 @@ def _import_project(filename, user, org_id):

def create_backup(db_instance: models.Project | models.Task, Exporter, output_path, logger, cache_ttl):
try:
cache_dir = db_instance.get_export_cache_directory()
cache_dir = db_instance.get_export_cache_directory(create=True)
output_path = os.path.join(cache_dir, output_path)

instance_time = timezone.localtime(db_instance.updated_date).timestamp()
if not (os.path.exists(output_path) and \
instance_time <= os.path.getmtime(output_path)):
os.makedirs(cache_dir, exist_ok=True)
with tempfile.TemporaryDirectory(dir=cache_dir) as temp_dir:
temp_file = os.path.join(temp_dir, 'dump')
exporter = Exporter(db_instance.id)
Expand Down
44 changes: 19 additions & 25 deletions cvat/apps/engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,25 @@ def get_dirname(self) -> str:
def get_tmp_dirname(self) -> str:
return os.path.join(self.get_dirname(), "tmp")

def get_export_cache_directory(self) -> str:
def get_export_cache_directory(self, create: bool = False) -> str:
base_dir = os.path.abspath(self.get_dirname())
cache_dir = os.path.join(base_dir, settings.EXPORT_CACHE_DIR_NAME)

if os.path.isdir(base_dir):
return os.path.join(base_dir, settings.EXPORT_CACHE_DIR_NAME)
if create:
os.makedirs(cache_dir, exist_ok=True)

raise FileNotFoundError(
'{self.__class__.__name__}: dir {base_dir} does not exist'
)
return cache_dir


class _Exportable(models.Model):
class Meta:
abstract = True

last_export_date = models.DateTimeField(null=True)

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

@transaction.atomic(savepoint=False)
def clear_annotations_in_jobs(job_ids):
Expand All @@ -463,7 +473,7 @@ def clear_annotations_in_jobs(job_ids):
LabeledImageAttributeVal.objects.filter(image__job_id__in=job_ids_chunk).delete()
LabeledImage.objects.filter(job_id__in=job_ids_chunk).delete()

class Project(TimestampedModel, _FileSystemRelatedModel):
class Project(TimestampedModel, _FileSystemRelatedModel, _Exportable):
name = SafeCharField(max_length=256)
owner = models.ForeignKey(User, null=True, blank=True,
on_delete=models.SET_NULL, related_name="+")
Expand All @@ -481,12 +491,6 @@ class Project(TimestampedModel, _FileSystemRelatedModel):
target_storage = models.ForeignKey('Storage', null=True, default=None,
blank=True, on_delete=models.SET_NULL, related_name='+')

last_export_date = models.DateTimeField(null=True)

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_labels(self, prefetch=False):
queryset = self.label_set.filter(parent__isnull=True).select_related('skeleton')
return queryset.prefetch_related(
Expand Down Expand Up @@ -544,7 +548,7 @@ def with_job_summary(self):
)
)

class Task(TimestampedModel, _FileSystemRelatedModel):
class Task(TimestampedModel, _FileSystemRelatedModel, _Exportable):
objects = TaskQuerySet.as_manager()

project = models.ForeignKey(Project, on_delete=models.CASCADE,
Expand Down Expand Up @@ -573,18 +577,13 @@ class Task(TimestampedModel, _FileSystemRelatedModel):
blank=True, on_delete=models.SET_NULL, related_name='+')
target_storage = models.ForeignKey('Storage', null=True, default=None,
blank=True, on_delete=models.SET_NULL, related_name='+')
last_export_date = models.DateTimeField(null=True)

segment_set: models.manager.RelatedManager[Segment]

# Extend default permission model
class Meta:
default_permissions = ()

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_labels(self, prefetch=False):
project = self.project
if project:
Expand Down Expand Up @@ -840,7 +839,7 @@ def _validate_constraints(self, obj: Dict[str, Any]):



class Job(TimestampedModel, _FileSystemRelatedModel):
class Job(TimestampedModel, _FileSystemRelatedModel, _Exportable):
objects = JobQuerySet.as_manager()

segment = models.ForeignKey(Segment, on_delete=models.CASCADE)
Expand All @@ -860,11 +859,6 @@ class Job(TimestampedModel, _FileSystemRelatedModel):
default=StateChoice.NEW)
type = models.CharField(max_length=32, choices=JobType.choices(),
default=JobType.ANNOTATION)
last_export_date = models.DateTimeField(null=True)

def touch_last_export_date(self):
self.last_export_date = timezone.now()
self.save(update_fields=["last_export_date"])

def get_target_storage(self) -> Optional[Storage]:
return self.segment.task.target_storage
Expand Down
4 changes: 2 additions & 2 deletions cvat/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,8 @@ class CVAT_QUEUES(Enum):
*(
{
'queue': CVAT_QUEUES.CLEANING.value,
'id': f'clear_{model.lower()}_export_cache',
'func': 'cvat.apps.dataset_manager.views.cron_job_to_clear_export_cache',
'id': f'cron_{model.lower()}_export_cache_cleanup',
'func': 'cvat.apps.dataset_manager.views.cron_export_cache_cleanup',
# Run once a day at midnight
'cron_string': '0 0 * * *',
# 'cron_string': '05 17 * * *',
Expand Down

0 comments on commit f20eadf

Please sign in to comment.