Skip to content

Commit

Permalink
Refactor all Celery tasks to a separate file (#1938)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicomiguelino authored Jun 20, 2024
1 parent 4665137 commit 0fd729b
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 81 deletions.
66 changes: 66 additions & 0 deletions celery_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import sh

from celery import Celery
from datetime import timedelta
from lib import diagnostics
from lib.utils import (
connect_to_redis,
is_balena_app,
reboot_via_balena_supervisor,
shutdown_via_balena_supervisor,
)
from os import getenv, path
from retry.api import retry_call


CELERY_RESULT_BACKEND = getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
CELERY_BROKER_URL = getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_TASK_RESULT_EXPIRES = timedelta(hours=6)

r = connect_to_redis()
celery = Celery(
'Anthias Celery Worker',
backend=CELERY_RESULT_BACKEND,
broker=CELERY_BROKER_URL,
result_expires=CELERY_TASK_RESULT_EXPIRES
)


@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls cleanup() every hour.
sender.add_periodic_task(3600, cleanup.s(), name='cleanup')
sender.add_periodic_task(60*5, get_display_power.s(), name='display_power')


@celery.task(time_limit=30)
def get_display_power():
r.set('display_power', diagnostics.get_display_power())
r.expire('display_power', 3600)


@celery.task
def cleanup():
sh.find(path.join(getenv('HOME'), 'screenly_assets'), '-name', '*.tmp', '-delete')


@celery.task
def reboot_anthias():
"""
Background task to reboot Anthias
"""
if is_balena_app():
retry_call(reboot_via_balena_supervisor, tries=5, delay=1)
else:
r.publish('hostcmd', 'reboot')


@celery.task
def shutdown_anthias():
"""
Background task to shutdown Anthias
"""
if is_balena_app():
retry_call(shutdown_via_balena_supervisor, tries=5, delay=1)
else:
r.publish('hostcmd', 'shutdown')
4 changes: 2 additions & 2 deletions docker/Dockerfile.celery.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ ENV GIT_HASH=$GIT_HASH
ENV GIT_SHORT_HASH=$GIT_SHORT_HASH
ENV GIT_BRANCH=$GIT_BRANCH

CMD celery -A server.celery worker \
-B -n worker@screenly \
CMD celery -A celery_tasks.celery worker \
-B -n worker@anthias \
--loglevel=info \
--schedule \
/tmp/celerybeat-schedule
84 changes: 20 additions & 64 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,39 @@
import pydbus
import psutil
import re
import sh
import os

import traceback
import yaml
import uuid
from base64 import b64encode
from celery import Celery
from datetime import datetime, timedelta
from dateutil import parser as date_parser
from functools import wraps
from hurry.filesize import size
from mimetypes import guess_type, guess_extension
from os import getenv, makedirs, mkdir, path, remove, rename, statvfs, stat
from retry.api import retry_call
from urllib.parse import urlparse

from flask import Flask, escape, make_response, render_template, request, send_from_directory, url_for, jsonify
from flask import (
Flask,
escape,
make_response,
render_template,
request,
send_from_directory,
url_for,
jsonify,
)
from flask_cors import CORS
from flask_restful_swagger_2 import Api, Resource, Schema, swagger
from flask_swagger_ui import get_swaggerui_blueprint

from gunicorn.app.base import Application
from werkzeug.wrappers import Request

from celery_tasks import celery, shutdown_anthias, reboot_anthias

from lib import assets_helper
from lib import backup_helper
from lib import db
Expand All @@ -48,27 +56,25 @@

from lib.github import is_up_to_date
from lib.auth import authorized

from lib.utils import (
download_video_from_youtube, json_dump, is_docker,
get_active_connections, remove_connection,
get_balena_supervisor_version,
get_node_ip, get_node_mac_address,
get_video_duration,
is_balena_app, is_demo_node,
shutdown_via_balena_supervisor, reboot_via_balena_supervisor,
string_to_bool,
connect_to_redis,
url_fails,
validate_url,
)

from settings import CONFIGURABLE_SETTINGS, DEFAULTS, LISTEN, PORT, settings, ZmqPublisher, ZmqCollector
from settings import (
CONFIGURABLE_SETTINGS, DEFAULTS, LISTEN, PORT,
settings, ZmqPublisher, ZmqCollector,
)

HOME = getenv('HOME')
CELERY_RESULT_BACKEND = getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0')
CELERY_BROKER_URL = getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
CELERY_TASK_RESULT_EXPIRES = timedelta(hours=6)

app = Flask(__name__)
app.debug = string_to_bool(os.getenv('DEBUG', 'False'))
Expand All @@ -77,56 +83,6 @@
api = Api(app, api_version="v1", title="Screenly OSE API")

r = connect_to_redis()
celery = Celery(
app.name,
backend=CELERY_RESULT_BACKEND,
broker=CELERY_BROKER_URL,
result_expires=CELERY_TASK_RESULT_EXPIRES
)


################################
# Celery tasks
################################

@celery.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# Calls cleanup() every hour.
sender.add_periodic_task(3600, cleanup.s(), name='cleanup')
sender.add_periodic_task(60*5, get_display_power.s(), name='display_power')


@celery.task(time_limit=30)
def get_display_power():
r.set('display_power', diagnostics.get_display_power())
r.expire('display_power', 3600)


@celery.task
def cleanup():
sh.find(path.join(HOME, 'screenly_assets'), '-name', '*.tmp', '-delete')


@celery.task
def reboot_screenly():
"""
Background task to reboot Screenly-OSE.
"""
if is_balena_app():
retry_call(reboot_via_balena_supervisor, tries=5, delay=1)
else:
r.publish('hostcmd', 'reboot')


@celery.task
def shutdown_screenly():
"""
Background task to shutdown Screenly-OSE.
"""
if is_balena_app():
retry_call(shutdown_via_balena_supervisor, tries=5, delay=1)
else:
r.publish('hostcmd', 'shutdown')


################################
Expand Down Expand Up @@ -1278,7 +1234,7 @@ class RebootScreenly(Resource):
}
})
def post(self):
reboot_screenly.apply_async()
reboot_anthias.apply_async()
return '', 200


Expand All @@ -1293,7 +1249,7 @@ class ShutdownScreenly(Resource):
}
})
def post(self):
shutdown_screenly.apply_async()
shutdown_anthias.apply_async()
return '', 200


Expand Down Expand Up @@ -1449,8 +1405,8 @@ def get(self):
api.add_resource(Info, '/api/v1/info')
api.add_resource(ResetWifiConfig, '/api/v1/reset_wifi')
api.add_resource(UpgradeScreenly, '/api/v1/upgrade_screenly')
api.add_resource(RebootScreenly, '/api/v1/reboot_screenly')
api.add_resource(ShutdownScreenly, '/api/v1/shutdown_screenly')
api.add_resource(RebootScreenly, '/api/v1/reboot')
api.add_resource(ShutdownScreenly, '/api/v1/shutdown')
api.add_resource(ViewerCurrentAsset, '/api/v1/viewer_current_asset')

try:
Expand Down
12 changes: 6 additions & 6 deletions static/js/settings.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ $().ready ->
$("#start-upgrade-btn").prop "disabled", no

$("#btn-reboot-system").click (e) ->
if confirm "Are you sure you want to reboot your Screenly?"
$.post "/api/v1/reboot_screenly"
if confirm "Are you sure you want to reboot your device?"
$.post "/api/v1/reboot"
.done (e) ->
($ "#request-error .alert").show()
($ "#request-error .alert").addClass "alert-success"
($ "#request-error .alert").removeClass "alert-danger"
($ "#request-error .msg").text "Screenly reboot has started successfully."
($ "#request-error .msg").text "Reboot has started successfully."
.fail (data, e) ->
($ "#request-error .alert").show()
($ "#request-error .alert").addClass "alert-danger"
Expand All @@ -146,13 +146,13 @@ $().ready ->
($ "#request-error .msg").text "The operation failed. Please reload the page and try again."

$("#btn-shutdown-system").click (e) ->
if confirm "Are you sure you want to shutdown your Screenly?"
$.post "/api/v1/shutdown_screenly"
if confirm "Are you sure you want to shutdown your device?"
$.post "/api/v1/shutdown"
.done (e) ->
($ "#request-error .alert").show()
($ "#request-error .alert").addClass "alert-success"
($ "#request-error .alert").removeClass "alert-danger"
($ "#request-error .msg").text "Screenly shutdown has started successfully. Soon you will be able to unplug the power from your Raspberry Pi."
($ "#request-error .msg").text "Device shutdown has started successfully. Soon you will be able to unplug the power from your Raspberry Pi."
.fail (data, e) ->
($ "#request-error .alert").show()
($ "#request-error .alert").addClass "alert-danger"
Expand Down
12 changes: 6 additions & 6 deletions static/js/settings.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion static/js/settings.js.map

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions tests/celery_tasks_test.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from os import getenv, path, listdir, system
import unittest

from server import celery as celeryapp
from server import cleanup
from celery_tasks import celery as celeryapp
from celery_tasks import cleanup


class CeleryTasksTestCase(unittest.TestCase):
Expand Down

0 comments on commit 0fd729b

Please sign in to comment.