Many python programmers are familiar with Celery due to its integration with django, a hugely popular web framework. A colleague recommended it to me recently when the need to schedule pruning of old logstash indices came up during a development discussion. As I soon discovered, Celery is a fast and powerful way to turn any python callable into a scheduled task, or message-processing worker. We’re now using it to clip the aforementioned indices, as well as perform other tasks such as injecting work payloads into a distributed queue.
Celery is a python package, so the easiest way to get it into your virtualenv (or Docker container, or vagrant env) is simply:
pip install celery
The gzipped tarball is only 1.3 megs, and while there are a few other dependencies (billiard, kombu, pytz) the installation takes less than a minute. Once it is complete you’re ready to create a task. Let’s start with a simple script that downloads a web page and saves it to a file. We’ll create the script and set it up to run every ten minutes. For downloading the page we’ll use the awesome requests package. First, the script itself.
from datetime import timedelta from celery import Celery import requests app = Celery('page_saver') app.conf.update( BROKER_URL='redis://localhost', CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE={ 'save_page':{ 'task':'page_saver.save_page', 'schedule': timedelta(minutes=10) } } ) @app.task(name='page_saver.save_page') def save_page(url, file_name): response = requests.get(url) if response.status_code == requests.codes.ok: f = open(file_name, 'wb') f.write(response.text) f.close()
After importing Celery and the requests package the first thing this script does is create the app object and initialize it. The Celery app object marks this module as a Celery app, and the only parameter we’re passing here is the name, which must match the name of the module. In this example the script would be saved on the file system as “page_saver.py”.
The call to app.conf.update is one way of passing in configuration data to the Celery object. There are several others, and in general most of the configuration options and settings are well beyond the scope of this post. You can find a good intro and links to more information here.
The first setting, ‘BROKER_URL’, specifies the pipeline that Celery will use for passing messages between clients and workers. I’m using redis here because I always have it lying around, but you can also use RabbitMQ, or a database, although that isn’t recommended in production.
The next two settings, ‘CELERY_TASK_SERIALIZER’, and ‘CELERY_ACCEPT_CONTENT’ instruct Celery to use json encoding when talking to the task as a client, and when accepting messages in the task as a server. Without these settings Celery will also allow pickle (and warn on startup), and nobody should receive pickle from a network port unless it is with a pair of tweezers. In any event, pickle is deprecated so json is the way to go for that reason as well.
The last setting contains the schedule on which our task will be executed. This requires some explanation, and it is easiest to do it in tandem with an explanation of the actual task method. As you can see in the script we define a method named save_page, and decorate it with “@app.task()”, passing in a name for the task. The naming seems pretty much arbitrary, but I like the suggested convention of “appname.taskname.”
The decorator turns this callable into the entry point for a Celery task with the given name. The script could contain many more tasks, each being individually callable, schedulable, etc., but for this example one will suffice, and I think I like a 1:1 mapping between app and task anyway. The actual implementation of the save_page method is self-explanatory, and contains no error handling or retry logic for brevity’s sake.
With the task defined the script above constitutes a complete Celery worker, and it can be run at any time using Celery to activate it and send it a message. For example, save the script to a folder and then cd into that folder and do this:
celery -A page_saver worker --loglevel=INFO
You should see a bunch of output indicating that celery is starting the task. Once it is up and running open up another terminal and start the python interpreter. Enter the following statements:
>>>from page_saver import save_page >>>save_page.delay('https://news.ycombinator.com/','hn.txt')
The task object, save_page, serves as a proxy to call itself in the worker process running in the other terminal session. The delay method executes the call asynchronously, and Celery provides additional methods to access the results when they become available. In this case the only result is the file output.
So, that brings us back to the schedule. To have a scheduled task we need a scheduler – a client to wake up and essentially call delay() on our task when a timer expires. Fortunately Celery includes celery-beat, which does exactly that. You can run celery-beat as a stand-alone service and use the same schedule configuration schema, but you can also run it in tandem with celery using the -B command line switch:
celery -A page_saver worker -B --loglevel=INFO
The schedule configuration in the example script establishes a single schedule named “save_page” and tells it to run the task named “page_saver.save_page” every ten minutes using a timedelta. You can also set the ‘schedule’ field to a crontab object and use all of the same options available to a cron job, so there is a lot of flexibility.
And that’s about it for this simple example, which incidentally is complete enough to handle a lot of system maintenance tasks. Add some error-handling, logging and notification and you’re ready to go. If your needs go beyond simple repeatable tasks you’ll find Celery has a lot of options for executing distributed workloads and returning results asynchronously. Have fun!