Recently, my attention was drawn, via a blog post to a Python task queue implementation called Celery. The object of my interest was not so much Celery itself - though it does look both interesting and well written - but the syntax it uses for tasks.
While App Engine's deferred library takes the 'higher level function' approach - that is, you pass your function and its arguments to the 'defer' function - I've never been entirely happy with that approach. Celery, in contrast, uses Python's support for decorators (one of my favorite language features) to create what, in my view, is a much neater and more flexible interface. While defining and calling a deferred function looks like this:
def my_task_func(some_arg): # do something defer(my_task_func, 123)
Doing the same in Celery looks like this:
@task def my_task_func(some_arg): # do something my_task_func.delay(123)
Using a decorator, Celery is able to modify the function it's decorating such that you can now call it on the task queue using a much more intuitive syntax, with the function's original calling convention preserved. Let's take a look at how this works, first, and then explore how we might make use of it ...
One common pattern when using the Task Queue API is known as 'task chaining'. You execute a task on the task queue, and at some point, determine that you're going to need another task, either to complete the work the current task is doing, or to start doing something new. Let's say you're doing the former, and your code looks something like this:
def task_func(): # Do some stuff deferred.defer(task_func) florb # This line causes an error
I'm sure you can guess what happens here. You successfully do some work, successfully chain the next task, then you encounter an error. Your code throws an exception, and returns a non-200 status code to the task queue, which notes the failure and schedules your task for re-execution. When it re-executes, the whole thing happens all over again (if your error is persistent, instead of transient, like the above).
Meanwhile, the task you enqueued runs. Perhaps it also fails after chaining its next task. Now you have two repeatedly executing tasks. Soon you have 4 - then 8 - then 16 - and so forth. Disaster!
"Ah, " you may say smugly, "I don't do anything important after chaining the next task ...
Last week, I blogged about cursors, a new feature in version 1.3.1 of the App Engine SDK. Today, I'm going to demonstrate a practical use for them: Bulk datastore updates.
In both the Remote API and deferred articles, I used a (perhaps poorly named) 'mapper' class as an example of ways to use these libraries. In neither case was the class intended to be anything other than a sample use case for the library, but nevertheless, people have used the examples in production. The introduction of cursors provides a prime opportunity to introduce a more robust, yet simpler, version of the bulk updater concept.
First, let's define a few requirements for our bulk updater:
- Support for any query for which a cursor can be obtained
- Handles failure of individual updates gracefully
- Can fail the whole update process if enough errors are encountered
- Handles timeout errors, service unavailability, etc, transparently
- Can report completion to admins
As in the Remote API and Deferred articles, we'll implement the updater as an abstract class, which individual updater implementations should subclass. Here's the basic interface:
import logging import time from google.appengine.api import mail from google.appengine.ext ...