Skip to content

Commit

Permalink
Merge pull request #21 from alanjds/lock-clients
Browse files Browse the repository at this point in the history
Lock clients. Only one calls the Watchdog.
  • Loading branch information
alanjds authored Jul 12, 2018
2 parents 70b7b13 + b0f3ae0 commit 00f9272
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 21 deletions.
22 changes: 6 additions & 16 deletions celery_serverless/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
logger.setLevel(os.environ.get('CELERY_SERVERLESS_LOGLEVEL'))
print('Celery serverless loglevel:', logger.getEffectiveLevel())

from redlock import RedLock
from redis import StrictRedis
from timeoutcontext import timeout as timeout_context
from celery_serverless.watchdog import Watchdog, KombuQueueLengther, build_intercom, invoke_watchdog
from celery_serverless.invoker import invoke_watchdog
from celery_serverless.watchdog import Watchdog, KombuQueueLengther, build_intercom, get_watchdog_lock
from celery_serverless.worker_management import spawn_worker, attach_hooks
hooks = []

Expand All @@ -32,7 +33,7 @@ def worker(event, context, intercom_url=None):
try:
remaining_seconds = context.get_remaining_time_in_millis() / 1000.0
except Exception as e:
logger.exception('Could not got remaining_seconds. Is the context right?')
logger.exception('Could not get remaining_seconds. Is the context right?')
remaining_seconds = 5 * 60 # 5 minutes by default

softlimit = remaining_seconds-30.0 # Poke the job 30sec before the abyss
Expand Down Expand Up @@ -62,22 +63,13 @@ def worker(event, context, intercom_url=None):

@handler_wrapper
def watchdog(event, context):
lock_name = os.environ.get('CELERY_SERVERLESS_LOCK_NAME', 'celery_serverless:watchdog')
lock_url = os.environ.get('CELERY_SERVERLESS_LOCK_URL')
assert lock_url, 'The CELERY_SERVERLESS_LOCK_URL envvar should be set. Even to "disabled" to disable it.'

queue_url = os.environ.get('CELERY_SERVERLESS_QUEUE_URL')
assert queue_url, 'The CELERY_SERVERLESS_QUEUE_URL envvar should be set. Even to "disabled" to disable it.'

intercom_url = os.environ.get('CELERY_SERVERLESS_INTERCOM_URL')
assert intercom_url, 'The CELERY_SERVERLESS_INTERCOM_URL envvar should be set. Even to "disabled" to disable it.'

if lock_url == 'disabled':
lock = None
elif lock_url.startswith(('redis://', 'rediss://')):
lock = RedLock(lock_name, connection_details=[{'url': node} for node in lock_url.split(',')])
else:
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now")
lock, lock_name = get_watchdog_lock(enforce=True)

if queue_url == 'disabled':
watched = None
Expand Down Expand Up @@ -106,11 +98,9 @@ def watchdog(event, context):
lock.release()
except (RuntimeError, AttributeError):
pass
else:
time.sleep(1) # Let distributed locks to propagate

logger.info('All set. Reinvoking the Watchdog')
_, future = invoke_watchdog()
_, future = invoke_watchdog(check_lock=False)
future.result()
logger.info('Done reinvoking another Watchdog')

Expand Down
33 changes: 31 additions & 2 deletions celery_serverless/invoker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging
import codecs
import json
import multiprocessing
from pprint import pformat

import dirtyjson
Expand All @@ -29,8 +30,10 @@
except ImportError: # Boto3 is an optional extra on setup.py
lambda_client = None


from .cli_utils import run
from .utils import run_aio_on_thread
from .utils import run_aio_on_thread, get_client_lock


CELERY_HANDLER_PATHS = {
'worker': 'celery_serverless.handler_worker',
Expand Down Expand Up @@ -221,5 +224,31 @@ def invoke_worker(config=None, data=None, *args, **kwargs):
return invoke(target='worker', extra_data=data or {}, *args, **kwargs)


def invoke_watchdog(config=None, data=None, *args, **kwargs):
def invoke_watchdog(config=None, data=None, check_lock=True, *args, **kwargs):
from .watchdog import get_watchdog_lock

if check_lock:
lock_watchdog = get_watchdog_lock(enforce=False)[0]
if lock_watchdog.locked():
logger.info('Watchdog lock already held. Giving up.')
return False, RuntimeError('Watchdog lock already held')
return invoke(target='watchdog', extra_data=data or {}, *args, **kwargs)


def client_invoke_watchdog(check_lock=True, blocking_lock=False, *args, **kwargs):
if not check_lock:
logger.debug('Not checking client lock')
return invoke_watchdog(check_lock=True, *args, **kwargs)

client_lock = get_client_lock()[0]
locked = client_lock.acquire(blocking_lock)
if not locked:
logger.info('Could not get Client lock. Giving up.')
return False, RuntimeError('Client lock already held')

logger.debug('Got the client lock')
try:
return invoke_watchdog(with_lock=True, *args, **kwargs)
finally:
logger.debug('Releasing the client lock')
client_lock.release()
2 changes: 1 addition & 1 deletion celery_serverless/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def trigger_invoke(task=None, *args, **kwargs):
logging.warning("Serverless worker will probable not get the task,"
" as its queue %s is probable not being listened there",
kwargs['queue'])
return invoker.invoke()
return invoker.client_invoke_watchdog()


class TriggerServerlessBeforeMixin(object):
Expand Down
71 changes: 71 additions & 0 deletions celery_serverless/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,23 @@
# coding: utf-8
import os
import asyncio
from threading import Thread
from inspect import isawaitable
import importlib

try:
from redis import StrictRedis

# MONKEYPATCH until https://github.com/andymccurdy/redis-py/pull/1007/ is merged
import redis.lock
def __locked(self):
if self.redis.get(self.name) is not None:
return True
return False
if not hasattr(redis.lock.Lock, 'locked'):
redis.lock.Lock.locked = __locked
except ImportError:
pass

slave_loop = None # loop
slave_thread = None # type: Thread
Expand All @@ -28,3 +44,58 @@ def run_aio_on_thread(coro):
if not slave_thread:
_start_thread()
return asyncio.run_coroutine_threadsafe(coro, slave_loop)


def _get_lock(lock_url='', lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='dummy_threading://',
lock_name='', lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='',
enforce=True) -> '(Lock, str)':
"""
Build or fetch a lock from `lock_url` or `lock_url_env` envvar,
falling back to `lock_url_default` contents. Can be a python module having
a Lock inside. E.g: 'multiprocessing://'
If the lock needs it, use `lock_url` or `lock_name_env` envvar
for the lock name, falling back to `lock_name_default` contents.
Passing `enforce=True` will raise a RuntimeError if lock url is empty.
"""
lock_name = os.environ.get(lock_name_env, lock_name_default)
lock_url = os.environ.get(lock_url_env, '')
if enforce:
if lock_url == 'disabled':
lock_url = ''
else:
assert lock_url, ('The %s envvar should be set. Even to "disabled" to disable it.' % lock_url_env)

if lock_url.startswith(('redis://', 'rediss://')):
redis = StrictRedis.from_url(lock_url)
lock = redis.lock(lock_name)
elif lock_url_default and not lock_url:
defaultlock_module_name = lock_url_default.partition('://')[0]
defaultlock_module = importlib.import_module(defaultlock_module_name)
lock = defaultlock_module.Lock()
else:
raise RuntimeError("This URL is not supported. Only 'redis[s]://...' is supported for now")
return lock, lock_name


def get_watchdog_lock(enforce=True) -> '(Lock, str)':
return _get_lock(lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='dummy_threading://',
lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='celery_serverless:watchdog',
enforce=enforce)


_CLIENT_LOCK = {}

def get_client_lock(enforce=False) -> '(Lock, str)':
if _CLIENT_LOCK:
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name']

lock, lock_name = _get_lock(
lock_url_env='CELERY_SERVERLESS_LOCK_URL', lock_url_default='multiprocessing://',
lock_name_env='CELERY_SERVERLESS_LOCK_NAME', lock_name_default='celery_serverless:watchdog-client',
enforce=enforce,
)

_CLIENT_LOCK.update({'lock': lock, 'lock_name': lock_name})
return _CLIENT_LOCK['lock'], _CLIENT_LOCK['lock_name']
7 changes: 5 additions & 2 deletions celery_serverless/watchdog.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding: utf-8
import os
import time
import uuid
import logging
Expand All @@ -13,7 +14,9 @@
from redis import StrictRedis
from kombu import Connection
from kombu.transport import pyamqp
from celery_serverless.invoker import invoke_worker, invoke_watchdog
from celery_serverless.invoker import invoke_worker

from .utils import get_watchdog_lock

logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')
Expand Down Expand Up @@ -104,7 +107,7 @@ def _done_callback(fut):
return success_calls

def monitor(self):
locked = self._lock.acquire()
locked = self._lock.acquire(False)
if not locked:
logger.info('Could not get the lock. Giving up.')
return 0
Expand Down
1 change: 1 addition & 0 deletions tests/test_celery_serverless.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def test_watchdog_needs_envvar(envname):
else:
raise RuntimeError('Had not raised an AssertionError')


@pytest.mark.timeout(30)
def test_watchdog_monitor_redis_queues(monkeypatch):
queue_url = 'redis://'
Expand Down
82 changes: 82 additions & 0 deletions tests/test_invoke.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""Tests for `celery_worker_serverless` package."""

import time
import uuid
import logging
import pytest
from pytest_shutil import env
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

from celery_serverless import watchdog, invoker

logger = logging.getLogger(__name__)


@pytest.mark.timeout(30)
def test_watchdog_monitor_redis_queues(monkeypatch):
lock_url = 'redis://'

redis = pytest.importorskip('redis')
from redis.exceptions import ConnectionError

conn = redis.StrictRedis.from_url(lock_url)
try:
conn.ping()
except ConnectionError as err:
pytest.skip('Redis server is not available: %s' % err)

# Redis is available.
# Lets set it up before test the watchdog

times_invoked = 0

def _simulate_watchdog_invocation(*args, **kwargs):
"""
Simulates a Watchdog invocation cycle via Redis locks changes
"""
logger.warning('Simulating an Watchdog invocation: START')

nonlocal times_invoked
times_invoked += 1

# 1) Watchdog fetches its lock
lock, lock_name = watchdog.get_watchdog_lock(enforce=True)

# 2) It runs with the lock or cancels
with_lock = lock.acquire(False)
if not with_lock:
logger.info('Watchdog COULD NOT got the lock')
else:
logger.info('Watchdog GOT the lock')
time.sleep(5)
lock.release()
logger.info('Watchdog RELEASED the lock')

logger.warning('Simulating an Watchdog invocation: END')

conn.flushdb()
with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url):
_simulate_watchdog_invocation() # Just be sure that it works.

with ThreadPoolExecutor() as executor:
monkeypatch.setattr(
'celery_serverless.invoker.invoke',
lambda *ar, **kw: (True, executor.submit(_simulate_watchdog_invocation)),
)

client_futures = []
times_invoked = 0

with env.set_env(CELERY_SERVERLESS_LOCK_URL=lock_url):
for i in range(20):
client_futures.append(executor.submit(invoker.client_invoke_watchdog))

client_invokes = [fut.result() for fut in as_completed(client_futures)]
result_flags, results = zip(*client_invokes)

assert times_invoked == 1, 'More than one client succeeded to invoke_watchdog'
assert len([i for i in result_flags if i == True]) == 1, 'More than one client got a lock'

0 comments on commit 00f9272

Please sign in to comment.