Using Celery to schedule python tasks

Many python programmers are familiar with Celery due to its integration with django, a hugely popular web framework. A colleague recommended it to me recently when the need to schedule pruning of old logstash indices came up during a development discussion. As I soon discovered, Celery is a fast and powerful way to turn any python callable into a scheduled task, or message-processing worker. We’re now using it to clip the aforementioned indices, as well as perform other tasks such as injecting work payloads into a distributed queue.

Celery is a python package, so the easiest way to get it into your virtualenv (or Docker container, or vagrant env) is simply:

pip install celery

The gzipped tarball is only 1.3 megs, and while there are a few other dependencies (billiard, kombu, pytz) the installation takes less than a minute. Once it is complete you’re ready to create a task. Let’s start with a simple script that downloads a web page and saves it to a file. We’ll create the script and set it up to run every ten minutes. For downloading the page we’ll use the awesome requests package. First, the script itself.

from datetime import timedelta
from celery import Celery
import requests

app = Celery('page_saver')
app.conf.update(
    BROKER_URL='redis://localhost',
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERYBEAT_SCHEDULE={
        'save_page':{
            'task':'page_saver.save_page',
            'schedule': timedelta(minutes=10)
        }
    }
)

@app.task(name='page_saver.save_page')
def save_page(url, file_name):
    response = requests.get(url)
    if response.status_code == requests.codes.ok:
        f = open(file_name, 'wb')
        f.write(response.text)
        f.close()

After importing Celery and the requests package the first thing this script does is create the app object and initialize it. The Celery app object marks this module as a Celery app, and the only parameter we’re passing here is the name, which must match the name of the module. In this example the script would be saved on the file system as “page_saver.py”.

The call to app.conf.update is one way of passing in configuration data to the Celery object. There are several others, and in general most of the configuration options and settings are well beyond the scope of this post. You can find a good intro and links to more information here.

The first setting, ‘BROKER_URL’, specifies the pipeline that Celery will use for passing messages between clients and workers. I’m using redis here because I always have it lying around, but you can also use RabbitMQ, or a database, although that isn’t recommended in production.

The next two settings, ‘CELERY_TASK_SERIALIZER’, and ‘CELERY_ACCEPT_CONTENT’ instruct Celery to use json encoding when talking to the task as a client, and when accepting messages in the task as a server. Without these settings Celery will also allow pickle (and warn on startup), and nobody should receive pickle from a network port unless it is with a pair of tweezers. In any event, pickle is deprecated so json is the way to go for that reason as well.

The last setting contains the schedule on which our task will be executed. This requires some explanation, and it is easiest to do it in tandem with an explanation of the actual task method. As you can see in the script we define a method named save_page, and decorate it with “@app.task()”, passing in a name for the task. The naming seems pretty much arbitrary, but I like the suggested convention of “appname.taskname.”

The decorator turns this callable into the entry point for a Celery task with the given name. The script could contain many more tasks, each being individually callable, schedulable, etc., but for this example one will suffice, and I think I like a 1:1 mapping between app and task anyway. The actual implementation of the save_page method is self-explanatory, and contains no error handling or retry logic for brevity’s sake.

With the task defined the script above constitutes a complete Celery worker, and it can be run at any time using Celery to activate it and send it a message. For example, save the script to a folder and then cd into that folder and do this:

celery -A page_saver worker --loglevel=INFO

You should see a bunch of output indicating that celery is starting the task. Once it is up and running open up another terminal and start the python interpreter. Enter the following statements:

>>>from page_saver import save_page
>>>save_page.delay('https://news.ycombinator.com/','hn.txt')

The task object, save_page, serves as a proxy to call itself in the worker process running in the other terminal session. The delay method executes the call asynchronously, and Celery provides additional methods to access the results when they become available. In this case the only result is the file output.

So, that brings us back to the schedule. To have a scheduled task we need a scheduler – a client to wake up and essentially call delay() on our task when a timer expires. Fortunately Celery includes celery-beat, which does exactly that. You can run celery-beat as a stand-alone service and use the same schedule configuration schema, but you can also run it in tandem with celery using the -B command line switch:

celery -A page_saver worker -B --loglevel=INFO

The schedule configuration in the example script establishes a single schedule named “save_page” and tells it to run the task named “page_saver.save_page” every ten minutes using a timedelta. You can also set the ‘schedule’ field to a crontab object and use all of the same options available to a cron job, so there is a lot of flexibility.

And that’s about it for this simple example, which incidentally is complete enough to handle a lot of system maintenance tasks. Add some error-handling, logging and notification and you’re ready to go. If your needs go beyond simple repeatable tasks you’ll find Celery has a lot of options for executing distributed workloads and returning results asynchronously. Have fun!

10 thoughts on “Using Celery to schedule python tasks

  1. Great introduction, super clear, the scheduler repeats an already queued task? if not, then how is going to provide the args in the task (ulr and file_name)?

  2. Thanks, glad you found it useful. Yes, celery saves the arguments passed in the call to delay() and uses them in repeated invocations of the decorated method.

  3. Thanks for the Great Post . But this is not working for me . Kindly help me to debug this issue,

  4. Awesome tutorial!

    But when I run the script, the worker gives me :

    UnicodeEncodeError: ‘ascii’ codec can’t encode characters in position 4915-4916: ordinal not in range(128)

    anyway to solve it?

  5. Hi, Kevin. Can you show me the line that is happening on? Any time you see that specific error it means that an implicit or explicit conversion from unicode to str is taking place. When the interpreter has to convert text from unicode to str and an encoding is not specified (i.e. u”something”.encode(“utf-8”)) the interpreter assumes ascii is the destination encoding. If some of the characters in the unicode string can’t be encoded in ascii then you get this error.

  6. Kevin, I took another look at the script, and I assume you’re getting the error on the file write? What’s going on there is that the get() method of the requests module returns a object, and file.write() takes a . So a conversion is forced there, and the page you’re grabbing has an encoding other than ascii (as almost all do). If you don’t know the actual encoding of the content try file.write(response.text.encode(‘utf-8’)). It feels hacky, but it fixes things for me about 90% of the time, and other solutions for situations where the encoding isn’t actually known are even worse.

  7. Hi Mark, Is it possible to execute another celery task inside the scheduled task.
    For Eg: A Celery task to write in the file instead of directly performing it inside save_page()

  8. I have a case where we have multiple users that will use a python program we developed that uses celery. Is there a way to distinguish celery tasks by who sent it (program instance). We would like to limit the concurrent tasks a single user can do so that they do not fill up all the concurrent celery users.

  9. I see my task executed only once by the “worker” even though the “beat” is sending the task every minute. What am I missing?

  10. example of log:
    [2017-09-01 01:02:46,420: INFO/Beat] beat: Starting…
    [2017-09-01 01:02:46,631: INFO/MainProcess] Connected to amqp://shotgunevents:**@shotgun-dev-evt01.wdi.cloud:5672/shotgun_events
    [2017-09-01 01:02:46,793: INFO/MainProcess] mingle: searching for neighbors
    [2017-09-01 01:02:46,911: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:02:48,072: INFO/MainProcess] mingle: all alone
    [2017-09-01 01:02:48,741: INFO/MainProcess] Events of group {task} enabled by remote.
    [2017-09-01 01:03:00,004: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:04:00,033: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:05:00,081: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:05:00,099: INFO/MainProcess] Received task: tasks.add[5e4f9e8d-0e9e-49b9-a33b-9c42cd6b0157]
    [2017-09-01 01:05:00,473: WARNING/PoolWorker-4] 5
    [2017-09-01 01:05:00,479: INFO/PoolWorker-4] Task tasks.add[5e4f9e8d-0e9e-49b9-a33b-9c42cd6b0157] succeeded in 0.10922038462s: 5
    [2017-09-01 01:06:00,042: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:07:00,062: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)
    [2017-09-01 01:08:00,042: INFO/Beat] Scheduler: Sending due task FirstScheduledEvent (tasks.add)

Leave a Reply

Your email address will not be published. Required fields are marked *