Bulk updates with cursors

Last week, I blogged about cursors, a new feature in version 1.3.1 of the App Engine SDK. Today, I'm going to demonstrate a practical use for them: Bulk datastore updates.

In both the Remote API and deferred articles, I used a (perhaps poorly named) 'mapper' class as an example of ways to use these libraries. In neither case was the class intended to be anything other than a sample use case for the library, but nevertheless, people have used the examples in production. The introduction of cursors provides a prime opportunity to introduce a more robust, yet simpler, version of the bulk updater concept.

First, let's define a few requirements for our bulk updater:

  • Support for any query for which a cursor can be obtained
  • Handles failure of individual updates gracefully
  • Can fail the whole update process if enough errors are encountered
  • Handles timeout errors, service unavailability, etc, transparently
  • Can report completion to admins

As in the Remote API and Deferred articles, we'll implement the updater as an abstract class, which individual updater implementations should subclass. Here's the basic interface:

import logging
import time
from google.appengine.api import mail
from google.appengine.ext import db
from google.appengine.ext.deferred import defer
from google.appengine.runtime import apiproxy_errors


class BulkUpdater(object):
  """A bulk updater for datastore entities.
  
  Subclasses should implement, at a minimum, get_query and handle_entity.
  """

  # Number of entities to put() at once.
  PUT_BATCH_SIZE = 20
  
  # Number of entities to delete() at once.
  DELETE_BATCH_SIZE = 20
  
  # Maximum time to spend processing before enqueueing the next task in seconds.
  MAX_EXECUTION_TIME = 20.0
  
  # Maximum number of failures to tolerate before aborting. -1 indicates
  # no limit, in which case the list of failed keys will not be retained.
  MAX_FAILURES = 0
  
  def __init__(self):
    self.__to_put = []
    self.__to_delete = []
    self.__failed_keys = []
    self.num_processed = 0
    self.num_tasks = 0
    self.num_put = 0
    self.num_deleted = 0
  
  def get_query(self):
    """Returns the query to iterate over.

    Returns:
      A db.Query or db.GqlQuery object. The returned query must support cursors.
    """
    raise NotImplementedError()

  def handle_entity(self, entity):
    """Performs processing on a single entity.
    
    Args:
      entity: A db.Model instance to update.
    """
    raise NotImplementedError()

  def finish(self, success, failed_keys):
    """Finish processing. Called after all entities have been updated.
    
    Args:
      success: boolean: Indicates if the process completed successfully, or was
        aborted due to too many errors.
      failed_keys: list: A list of db.Key objects that could not be updated.
    """
    pass

The first thing we do is define some constants that will affect the operation of our updater - the batch sizes for put and delete operations, and the maximum time to execute before enqueueing the next task. This last one is necessary because tests have shown that the approach used in the deferred article, of catching the first deadline error and enqueueing the next task then, is not sufficiently reliable. We also define a maximum number of update failures to tolerate before aborting the update process.

We need some way for updater instances to propagate their changes back to the datastore. As with previous mappers, we want to batch these operations for efficiency. This time, let's define helper methods that the handle_entity() method can call:

  def put(self, entities):
    """Stores updated entities to the datastore.
    
    Updates are batched for efficiency.
    
    Args:
      entities: An entity, or list of entities, to store.
    """
    if isinstance(entities, db.Model):
      entities = [entities]
    self.__to_put.extend(entities)
    
    while len(self.__to_put) > self.PUT_BATCH_SIZE:
      db.put(self.__to_put[-self.PUT_BATCH_SIZE:])
      del self.__to_put[-self.PUT_BATCH_SIZE:]
      self.num_put += self.PUT_BATCH_SIZE

  def delete(self, entities):
    """Deletes entities from the datastore.
    
    Deletes are batched for efficiency.
    
    Args:
      entities: An entity, key, or list of entities or keys, to delete.
    """
    if isinstance(entities, (db.Key, db.Model, basestring)):
      entities = [entities]
    self.__to_delete.extend(entities)
    
    while len(self.__to_delete) > self.DELETE_BATCH_SIZE:
      db.delete(self.__to_delete[-self.DELETE_BATCH_SIZE:])
      del self.__to_delete[-self.DELETE_BATCH_SIZE:]
      self.num_deleted += self.DELETE_BATCH_SIZE

These methods are fairly straightforward: We add passed in entities or keys to our internal lists for batching purposes, and if there are enough entries, we execute the batch update process for each batch. We use a while loop rather than an if, because a single call to put() or delete() could add more than one batch's worth of entries to put or delete.

Now we can implement the code that does the actual work. We'll start by defining a method that executes a single batch of work:

  def __process_entities(self, q):
    """Processes a batch of entities.
    
    Args:
      q: A query to iterate over doing processing.
    Returns:
      True if the update process has finished, False otherwise.
    """
    end_time = time.time() + self.MAX_EXECUTION_TIME
    for entity in q:
      try:
        self.handle_entity(entity)
      except (db.Timeout, apiproxy_errors.CapabilityDisabledError,
              apiproxy_errors.DeadlineExceededError):
        # Give up for now - reschedule for later.
        return False
      except Exception, e:
        # User exception - log and (perhaps) continue.
        logging.exception("Exception occurred while processing entity %r",
                          entity.key())
        if self.MAX_FAILURES >= 0:
          self.__failed_keys.append(entity.key())
          if len(self.__failed_keys) > self.MAX_FAILURES:
            # Update completed (failure)
            return True
      
      self.num_processed += 1
      
      if time.time() > end_time:
        return False
    
    # The loop finished - we're done!
    return True

The __process_entities method takes a query, already positioned at the start of a batch, and iterates over it. We use iteration rather than the more efficient fetch(), because we don't know how many entities we will be able to process in the allotted time.

Most of this method is taken up with exception handling. Several exceptions are treated specially - db.Timeout, apiproxy_errors.CapabilityDisabledError, and apiproxy_errors.DeadlineExceededError, by immediately terminating the batch. We give up immediately on Timeout errors due to the new changes in 1.3.1 which mean that a Timeout returned to our code almost certainly indicates a need to retry at a later stage. Other errors are assumed to be user errors, and are caught and logged. If there's a finite threshold for the maximum number of user errors, the key of the failing entity is recorded, and we abort if we've reached the limit. Finally, after processing each entity, we check the current system time, to determine if we have reached our self-imposed deadline. The method returns True if processing is done - due to success or failure - and False otherwise.

Now we can define run(), the method that handles the whole process:

  def run(self, _start_cursor=None):
    """Begins or continues a batch update process."""
    q = self.get_query()
    if _start_cursor:
      q.with_cursor(_start_cursor)
    
    finished = self.__process_entities(q)
    
    # Store or delete any remaining entities
    if self.__to_put:
      db.put(self.__to_put)
    if self.__to_delete:
      db.delete(self.__to_delete)
    self.num_put += len(self.__to_put)
    self.__to_put = []
    self.num_deleted += len(self.__to_delete)
    self.__to_delete = []
    
    self.num_tasks += 1
    
    if finished:
      logging.info(
          "Processed %d entities in %d tasks, putting %d and deleting %d",
          self.num_processed, self.num_tasks, self.num_put, self.num_deleted)
      self.finish(len(self.__failed_keys) <= self.MAX_FAILURES
                  and self.MAX_FAILURES >= 0,
                  self.__failed_keys)
    else:
      defer(self.run, q.cursor())

run()'s main job is to create a query with which to call __process_entities(), and to clean up after it by storing and deleting any remaining entities. Finally, it checks if the process has finished; if it has, it calls finish(); otherwise, it enqueues the next task, picking up where this one left off.

Back in the original requirements, we included the requirement that it be possible to report completion to admins. Let's do that with a mixin:

class ReportingMixin(object):
  def __init__(self, email_sender=None):
    """Constructor.
    
    Args:
      email_sender: If set, send a completion email to admins, from the provided
        email address.
    """
    super(ReportingMixin, self).__init__()
    self.email_sender = email_sender

  def finish(self, success, failed_keys):
    super(ReportingMixin, self).finish(success, failed_keys)
    if not self.email_sender:
      return

    if success:
      message = "Bulk update job %s completed successfully!\n\n" % self.__class__
      subject = "Bulk update completed"
    else:
      message = "Bulk update job %s failed.\n\n" % self.__class__
      subject = "Bulk update FAILED"
    
    message += ("Processed %d entities in %d tasks, putting %d and deleting %d\n\n"
                % (self.num_processed, self.num_tasks, self.num_put,
                   self.num_deleted))
    
    if failed_keys:
      message += "Processing failed for the following keys:\n"
      for key in failed_keys:
        message += "%r\n" % key
    
    mail.send_mail_to_admins(self.email_sender, subject, message)

This mixin simply extends the finish() method, and if a sender address is provided, sends an email from it to all the app's admins, giving a brief report of the process's completion or failure.

Finally, we can define a couple of simple classes for commonly used types of update operation:

class BulkPut(ReportingMixin, BulkUpdater):
  def __init__(self, query, email_sender=None):
    super(BulkPut, self).__init__(email_sender)
    self.query = query

  def get_query(self):
    return self.query

  def handle_entity(self, entity):
    self.put(entity)


class BulkDelete(ReportingMixin, BulkUpdater):
  def __init__(self, query, email_sender=None):
    super(BulkDelete, self).__init__(email_sender)
    self.query = query

  def get_query(self):
    return self.query

  def handle_entity(self, entity):
    self.delete(entity)

These two classes are almost identical, except for the operation carried out on each entity. In each case, the constructor takes a Query object, which is stored as an instance attribute and returned by get_query; this works because Query objects are picklable, and run() is guaranteed not to modify the query except by calling .with_cursor() on it.

We can test our updater from the remote_api console, like so:

notdot-blog> updater = bulkupdate.BulkPut(models.BlogPost.all())
notdot-blog> updater.MAX_EXECUTION_TIME=1.0
notdot-blog> defer(updater.run)

Checking the admin console shows the deferred tasks being executed, and checking our email shows a message in our inbox titled "Bulk update completed".

As always, bear in mind the limitations of the deferred library when it comes to import path changes, etcetera.

The complete source of our new bulk updater can be found here.

Finally, a few suggestions for how you can use this module:

  • Updating instances of a model whose definition has changed, for indexing purposes, using the BulkPut class we defined above.
  • Bulk deleting an entity kind, or a subset of it, using the BulkDelete class we defined above.
  • Calculating global statistics by storing them against the BulkUpdater instance across requests. Make sure these remain small - if the pickled size of the entity exceeds 10k, each deferred invocation will have to load it from the datastore and store it again at the end!
  • Migrating models to new definitions or kind names.
  • Performing more complex 'map' operations, such as inserting or updating one entity based on the contents of another.
  • Doing periodic updates of stored counts, etc.

Got more ideas? Mention them in the comments! Are you using this class for something novel? Let us know!

Comments

blog comments powered by Disqus