Distributed Task Locking in Celery
Background
We use the Celery distributed task queue library at work, which is great for running asynchronous tasks across multiple processes and servers. Celery has both user-initiated and periodic (think cron replacement) tasks, and we have found in practice that the system distributes tasks quite nicely across our farm of celery servers.
One issue we have is that for several of our periodic tasks, we need to ensure that only one task is running at a time, and that later instances of the same periodic task are skipped if a previous incarnation is still running.
The Celery documentation has a cookbook recipe for this scenario: “Ensuring a task is only executed one at a time”. The crux of the solution is to make a distributed lock using the Django cache (memcached in the example) with the following lambda’s:
Non-persistent Locks?
This approach works fine if the cache is shared across all celery worker nodes and the cache is persistent. However, if memcached (or some other non- persistent cache) is used and (1) the cache daemon crashes or (2) the cache key is culled before the appropriate expiration time / lock release, then you have a race condition where two or more tasks could simultaneously acquire the task lock. This “distributed cache lock” approach has been discussed in various posts, which all acknowledge the danger of relying on memcached for persistent data.
Distributed Locks with Redis
As noted by one of the links above, the simplest solution to this problem if you like using the cache for distributed locks is to switch to memcachedb which is not a caching solution per se, but rather a persistent key-value store that implements the memcached interface.
However, for our system, using memcached as our Django cache is great in its current non-persistent form for what it is – a cache. So, I investigated more generally to find a high performance, persistent key-value store (ideally with a decent Python interface). After reviewing a lot of neat and interesting systems, I finally settled on Redis. Redis provides a very high performance key-value store (data is maintained in memory) with persistence (using the append only file feature) and distribution / replication. As an added bonus, I found the server setup, installation and CLI interaction to be very straightforward.
So, back to locking… The Redis python client already has a lock class
with “with
” operator support:
However, the above example is a blocking lock, and for the “single task” issue, we want a non-blocking lock, that simply exits if the lock is not acquired:
Beyond this simple example, the Lock class implements key expiration (via the Redis setnx command) to enable timeouts in the python client.
Enforced Single Celery Task
So, bringing this back to our celery tasks, we can use this distributed lock to have our tasks try to acquire a non-blocking lock, and exit if the lock isn’t acquired. Also, we want to set a lock timeout (lasting for a generous overestimate of task duration time) so that tasks will eventually be able to re-acquire the lock if some task / celery node hard crashes or goes in to an unresponsive state before releasing the lock.
All of this can be put together as a decorator around a Task.run() method:
Note that this decorator preserves task return values. If your tasks don’t
have return values, you can get rid of the ret_value
code.
Using the decorator is easy – just annotate a task run()
method:
… and your task will only ever have one running instance at any given time.