Skip to content

Commit

Permalink
Merge pull request #136 from phischolz/wip/fix-s3-cleanup#main
Browse files Browse the repository at this point in the history
make celery settings accessible via env
  • Loading branch information
s4ke authored Jul 9, 2024
2 parents 5d4c568 + 54306e6 commit b5433fb
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 15 deletions.
2 changes: 1 addition & 1 deletion skipper/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ SQLAlchemy = "==1.4.51"
Deprecated = "==1.2.14"
protobuf = "==3.20.3"
wheel = "0.38.4"
certifi = "==2024.2.2"
certifi = "==2024.07.04"
pip = "==24.0"

[dev-packages]
Expand Down
91 changes: 79 additions & 12 deletions skipper/skipper/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from __future__ import absolute_import
import os
from typing import Union, Any
from celery import Celery # type: ignore
from celery.schedules import crontab # type: ignore
from django.conf import settings
Expand All @@ -17,6 +18,36 @@
app = Celery('skipper', result_backend=settings.CELERY_RESULT_BACKEND)


def int_or_crontab(input: Any, key: str) -> Union[int, crontab]:
try: # attempt to interpret input as positive int
out = int(input)
if out <= 0:
raise ValueError() # not positive, break out of try block
return out
except BaseException:
pass # try the next option
try: # attempt to interpret input as crontab
if isinstance(input, str) and len(input.split()) >= 5:
parts = input.split()

minute, hour, day_of_month, month, day_of_week = parts[:5]

return crontab(
minute=minute,
hour=hour,
day_of_week=day_of_week,
day_of_month=day_of_month,
month_of_year=month
)
else:
raise ValueError() # not a crontab, break out of try block
except BaseException:
# both failed, now raise
raise ValueError(f'{key} was passed in a bad format. '
'Either pass a positive integer or a string '
f'in crontab(5) format. The passed value was {input}')


# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
Expand All @@ -26,63 +57,99 @@
app.conf.beat_schedule = {
'event-queue-heartbeat': {
'task': '_3_wake_up_heartbeat_consumers',
'schedule': getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE', 10),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE', 10),
'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE'
),
'options': {
'queue': 'event_queue',
'expires': getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE', 10)
'expires': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE', 10),
'SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE'
)
}
},
'event-cleanup-heartbeat': {
'task': '_3_wake_up_consumer_cleanup',
'schedule': getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE', crontab(hour=1)),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE', crontab(hour=1)),
'SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE'
),
'options': {
'queue': 'event_cleanup'
}
},
'data_series-history-cleanup-heartbeat': {
'task': '_3_wake_up_data_series_history_cleanup',
'schedule': getattr(settings, 'SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE', crontab(hour=1)),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE', crontab(hour=1)),
'SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE'
),
'options': {
'queue': 'data_series_cleanup'
}
},
'file-registry-cleanup-heartbeat': {
'task': '_3_wake_up_file_registry_cleanup',
'schedule': getattr(settings, 'SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE', crontab(hour=1)),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE', crontab(hour=1)),
'SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE'
),
'options': {
'queue': 'file_registry_cleanup'
}
},
'data_series-meta-model-cleanup-heartbeat': {
'task': '_3_wake_up_data_series_meta_model_cleanup',
'schedule': getattr(settings, 'SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE', crontab(hour=1)),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE', crontab(hour=1)),
'SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE'
),
'options': {
'queue': 'data_series_cleanup'
}
},
'data_series-requeue-persist-data-point-chunk-heartbeat': {
'task': '_3_wake_up_requeue_persist_data_point_chunk',
'schedule': getattr(settings, 'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30),
'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE'
),
'options': {
'queue': 'requeue_persist_data',
'expires': getattr(settings, 'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30)
'expires': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30),
'SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE'
)
}
},
'health-check-heartbeat': {
'task': '_5_run_health_checks',
'schedule': getattr(settings, 'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30),
'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE'
),
'options': {
'queue': 'health_check',
'expires': getattr(settings, 'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30),
'expires': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30),
'SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE'
),
}
},
'common-cleanup-outstanding-tokens-heartbeat': {
'task': '_common_cleanup_outstanding_tokens',
# every hour
'schedule': getattr(settings, 'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60),
'schedule': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60),
'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE'
),
'options': {
'queue': 'event_cleanup',
'expires': getattr(settings, 'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60)
'expires': int_or_crontab(
getattr(settings, 'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60),
'SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE'
)
}
},
}
4 changes: 2 additions & 2 deletions skipper/skipper/dataseries/tasks/file_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# This file is part of NF Compose
# [2019] - [2024] © NeuroForge GmbH & Co. KG

from typing import Optional
import datetime
from django.utils import timezone
from django_multitenant.utils import set_current_tenant # type: ignore

from skipper.environment import SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS
from skipper.core.celery import task
from skipper.core.models import default_media_storage
from skipper.core.models.tenant import Tenant
Expand All @@ -19,7 +19,7 @@
def actual_file_registry_cleanup() -> None:
file_registry.garbage_collect(
storage=default_media_storage,
older_than=datetime.datetime.now() - timezone.timedelta(days=7)
older_than=datetime.datetime.now() - timezone.timedelta(hours=SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS)
)


Expand Down
13 changes: 13 additions & 0 deletions skipper/skipper/environment_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,17 @@
# dataseries consumers

SKIPPER_CELERY_EVENT_QUEUE_MAX_EVENTS_PER_CONSUMER_HEARTBEAT = int(os.environ.get('SKIPPER_CELERY_EVENT_QUEUE_MAX_EVENTS_PER_CONSUMER_HEARTBEAT', 200))

SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE', 10))
SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE', '0 1 * * *')
SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE', '0 1 * * *')
SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS = int(os.environ.get('SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS', 24))
if SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS <= 0:
raise ValueError('SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS must be a positive integer')
SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE', '0 1 * * *')
SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE = os.environ.get('SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE', '0 1 * * *')
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE', 60 * 30))
SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE', 30))
SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE = int(os.environ.get('SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE', 60 * 60))

SKIPPER_CONSUMER_PROXY_URL = os.environ.get('SKIPPER_CONSUMER_PROXY_URL', None)
10 changes: 10 additions & 0 deletions skipper/skipper/settings_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,16 @@ def task_upstream_dashboard(tenant: Tenant, user: Optional[Union[User, Anonymous
# CELERY STUFF
CELERY_BROKER_URL = environment.SKIPPER_CELERY_BROKER_URL

SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE = environment.SKIPPER_CELERY_EVENT_QUEUE_HEARTBEAT_SCHEDULE
SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE = environment.SKIPPER_CELERY_EVENT_QUEUE_CLEANUP_SCHEDULE
SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE = environment.SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_SCHEDULE
SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS = environment.SKIPPER_CELERY_FILE_REGISTRY_CLEANUP_MAX_AGE_HOURS
SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE = environment.SKIPPER_CELERY_DATA_SERIES_HISTORY_CLEANUP_SCHEDULE
SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE = environment.SKIPPER_CELERY_DATA_SERIES_META_MODEL_CLEANUP_SCHEDULE
SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE = environment.SKIPPER_CELERY_PERSIST_DATA_POINT_CHUNK_REQUEUE_SCHEDULE
SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE = environment.SKIPPER_CELERY_HEALTH_CHECK_HEARTBEAT_SCHEDULE
SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE = environment.SKIPPER_CELERY_OUTSTANDING_TOKENS_CLEANUP_SCHEDULE

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
Expand Down

0 comments on commit b5433fb

Please sign in to comment.