Distributed Transactions on App Engine

This is the fourth in a series of 'cookbook' posts describing useful strategies and functionality for writing better App Engine applications.

As promised, today we're going to discuss Distributed Transactions on App Engine. Distributed transactions are that feature that you didn't know you needed until they were gone: The ability to update any set of entities or records in your database in a single transaction. Due to the requirements of scale, App Engine supports transactions, but only on predefined Entity Groups. For most cases, this is all you need, but occasionally you really have to use a distributed or global transaction.

There are proposals for implementing distributed transactions on App Engine, but they're complicated, and none of them have yet been implemented as a robust library you can simply plug in and use. We're not going to attempt to recreate a general purpose distributed transaction system - at least, not today - instead, we'll address one common use-case for distributed transactions.

The usual example of the need for distributed or global transactions - so common that it's practically canonical - is the 'bank account' example. Suppose you have a set of accounts, defined something like this:

class Account(db.Model): owner = db.UserProperty(required=True) balance = db.IntegerProperty(required=True, default=0)

Naturally, you need to be able to transfer funds between accounts; those transfers need to be transactional, or you risk losing peoples' money, or worse (from a bank's point of view) duplicating it! You can't group users into entity groups, because it would still be impossible to transfer money between users that were assigned to different entity groups. Further, you need to be able to prevent people from overdrawing their accounts.

Fortunately, we can make it possible to do transactional transfers between accounts fairly simply. The key thing to realise, that makes everything much simpler, is that funds transfers do not have to be atomic. That is, it's okay to briefly exist in a state where the funds have been deducted from the paying account, but not yet credited to the payee account, as long as we can ensure that the transfer will always complete, and as long as we can maintain our invariants (such as the total amount of money in the bank) along the way.

Let's start by defining a simple transaction record model:

class Transfer(db.Model): amount = db.IntegerProperty(required=True) target = db.ReferenceProperty(Account, required=True) other = db.SelfReferenceProperty() timestamp = db.DateTimeProperty(required=True, auto_now_add=True)

As you can see, this is fairly straightforward. A Transfer entity will always be the child entity of an Account; this is the account that the transaction is concerned with, and being a child entity means we can update it and the account transactionally, since they're in the same entity group. Each transfer will create two Transfer entities, one on the paying account, and one on the receiving account.

The amount field is fairly obvious; here we'll use it to signify the change in value to the account it's attached to, so the paying account will have a negative amount, while the receiving account will have a positive amount. The target field denotes the account the transfer was to or from, while the 'other' field denotes the other Transfer entity.

At this point, it would help to describe the basic process we expect to follow in making a transfer between accounts:

  1. In a transaction, deduct the required amount from the paying account, and create a Transfer child entity to record this, specifying the receiving account in the 'target' field, and leaving the 'other' field blank for now.
  2. In a second transaction, add the required amount to the receiving account, and create a Transfer child entity to record this, specifying the paying account in the 'target' field, and the Transfer entity created in step 1 in the 'other' field.
  3. Finally, update the Transfer entity created in step 1, setting the 'other' field to the Transfer we created in step 2.

Each of the three steps above is transactional, thanks to the guarantees made by the App Engine datastore. What's less obvious is that the process can only proceed forwards: Once step 1 has succeeded (eg, because the user had sufficient funds in their account at the time), steps 2 and 3 will inevitably succeed - either immediately, or at some later point if something causes a transient failure. A process picking up the pieces later can easily determine which steps have been completed, and pick up where the previous process left off, without omitting or repeating anything.

Let's implement step 1, in the form of a 'transfer_funds' method:

def transfer_funds(src, dest, amount): def _tx(): account = Account.get(src) if account.balance < amount: return None account.balance -= amount transfer = Transfer( parent=account, amount=-amount, target=dest) db.put([account, transfer]) return transfer return db.run_in_transaction(_tx)

Straightforward, right? At the point that this function returns successfully, the transaction can only go one way - forward. If the process currently handling it dies unexpectedly, another one can pick it up later, and 'roll it forward'. Since the process of completing a transaction and rolling it forward if it fails are one and the same, we'll define a roll_forward method that completes the transaction:

def roll_forward(transfer): def _tx(): dest_transfer = Transfer.get_by_key_name(parent=transfer.target.key(), str(transfer.key())) if not dest_transfer: dest_transfer = Transfer( parent=transfer.target.key(), key_name=str(transfer.key()), amount=-transfer.amount, target=transfer.key().parent(), other=transfer) account = Account.get(transfer.target.key()) account.balance -= transfer.amount db.put([account, dest_transfer]) return dest_transfer dest_transfer = db.run_in_transaction(_tx) transfer.other = dest_transfer transfer.put()

This function is a little more complicated than transfer_funds, but it's still straightforward if we break it down: We pass in the transfer entity returned by transfer_funds. First, the function tries to fetch an existing Transfer for the destination account - this might already exist if a previous attempt to roll the transaction forward failed - using the receiving account as the parent, and specifying a key name based on the key of the paying account's Transfer entity. We need to specify a key name in order to ensure there can only be one matching Transfer entity for the destination account.

If the receiving account has no matching Transfer, we create one, specifying the amount and target based on the first Transfer, and setting the 'other' field to the first Transfer. Then, we fetch the Account, add the transferred amount to its funds, and put both the new Transfer and the updated Account back to the datastore.

Finally, outside the transaction, we get the returned dest_transfer entity, and update the original Transfer entity to reference it. We don't need to use another transaction when we store this entity back to the datastore, because the only possible modification of a Transfer after creating it is to set the 'other' field, which is what we're doing.

That, in a nutshell, is how to transfer money between accounts in App Engine in a robust and consistent fashion. Simply call transfer_funds(src, dest, amount), then call roll_forward() on the returned Transfer object. If you wish, you don't even have to roll_forward the transaction right away - for example, you can enqueue the key of the returned Transaction in the Task Queue, and leave it up to the task to complete the transaction, thus decreasing user-perceived latency for transfers.

You may be wondering, though, how partially applied transactions get rolled forward. The solution is simple: We find Transfer entities with their 'other' field unset, and call the roll_forward method on them:

def execute_unapplied_transactions(count=20): cutoff = datetime.datetime.now() - datetime.timedelta(seconds=30) q = Transfer.all().filter("other =", None).filter("timestamp <", cutoff) for transfer in q.fetch(count): roll_forward(transfer)

This function can be executed from a cron job or the task queue at intervals, to ensure that any failed transactions get rolled forward. If you're taking the 'deferred completion' approach described above, you can even leave it up to this method to roll forward all transactions!

In the next post, we'll be returning to the Bulk Loader, and demonstrating how to load data directly from an SQL database, or nearly any other data source.


blog comments powered by Disqus