Task Queue task chaining done right

One common pattern when using the Task Queue API is known as 'task chaining'. You execute a task on the task queue, and at some point, determine that you're going to need another task, either to complete the work the current task is doing, or to start doing something new. Let's say you're doing the former, and your code looks something like this:

def task_func():
  # Do some stuff
  deferred.defer(task_func)
  florb # This line causes an error

I'm sure you can guess what happens here. You successfully do some work, successfully chain the next task, then you encounter an error. Your code throws an exception, and returns a non-200 status code to the task queue, which notes the failure and schedules your task for re-execution. When it re-executes, the whole thing happens all over again (if your error is persistent, instead of transient, like the above).

Meanwhile, the task you enqueued runs. Perhaps it also fails after chaining its next task. Now you have two repeatedly executing tasks. Soon you have 4 - then 8 - then 16 - and so forth. Disaster!

"Ah, " you may say smugly, "I don't do anything important after chaining the next task. Nothing that could fail, at any rate!" Drop the smugness, however: Even if you do nothing wrong, it's possible (though admittedly unlikely) that the task queue can re-execute your task anyway, causing the same problem. This is why tasks are required to be idempotent. So how do we work around this?

The solution is straightforward: use task names. Task names ensure that the chained task is only enqueued once, even if the task doing the chaining gets re-executed repeatedly. There are many ways to derive a task name; here's one:

def task_func(task_id):
  # Do some stuff
  try:
    task_id += 1
    deferred.defer(task_func, task_id, _name='task_func(%r)' % task_id)
  except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError):
    pass
  florb # Still errors, but the chained task only gets added once!

The important thing to do is to ensure that the task name is unique, but remains the same if the task gets run multiple times. In this case, we pass in a task_id, and use that to generate the task name, incrementing the ID before passing it to the chained task, so the chained task uses the next ID, and so forth. When we start off the first task, we simply have to make sure that we assign it a unique ID number. Naturally, the same applies if your task fires off multiple other tasks - just ensure you use a unique, consistent name for each.

If you want to see this in practice, the bulkupdate library makes use of this pattern, constructing task names from the datastore ID of the bulkupdate job (to ensure uniqueness) and the number of the chained task (to ensure consistency).

Comments

blog comments powered by Disqus