High concurrency counters without sharding
Posted by Nick Johnson | Filed under python, app-engine, cookbook, coding
Sharded Counters are a well known technique for keeping counters with high update rates on App Engine. Less well known, however, are some of the alternatives, particularly in areas where you want to keep a reasonably accurate counter, but absolute accuracy isn't required. I discussed one option in this cookbook post - be sure to check the comments for an improved version - and today we'll discuss another option, which also makes use of memcache and the task queue.
The basic assumption is this: We want to keep as accurate a count as possible, but we're willing to accept that it may, in some cases, under-count. A good example of where this is true is counting downloads, or hits, or other such metrics.
Our solution has three major components:
- A 'permanent' count, stored in the datastore.
- A 'current' count, stored in memcache.
- A task queue task that updates the datastore with the total from memcache.
In order to implement this, we'll take advantage of the task queue's task name functionality, and 'tombstoned tasks' - the restriction that two tasks with the same name cannot be enqueued within a reasonable period (at least a week) of each other. Each time we want to update the counter, we increment the current value in memcache, and attempt to queue a task that will store the memcached total to the datastore at the end of the interval. The tradeoff, here, is the length of time we will cache the data for: The longer it is, the less load we put on the datastore, but the higher the risk of losing data from memcache before it's stored.
Let's start by defining the counter itself, and the code to get its current value:
class Counter(db.Model): count = db.IntegerProperty(required=True, default=0) @classmethod def get_value(cls, name): """Returns the value of the counter with the specified name.""" counter = cls.get_by_key_name(name) if counter: count = counter.count else: count = 0 count += memcache.get(name, cls.kind()) return count
The model itself has only one property - the count - and the method we've defined to get the current value retrieves the relevant instance - if it exists - and adds its value to the value stored in Memcache. One possible optimisation here would be to cache the counter entity in local memory - we know it's safe to do so at least until the end of the current time interval, however long that may be.
To define the update part of the code, we need a couple of prerequisites: We need a way to generate task names, such that there is one unique one for every interval of a given length, and we need to write the actual function that will copy the count from memcache to the datastore. First things first:
def get_interval_number(ts, duration): """Returns the number of the current interval. Args: ts: The timestamp to convert duration: The length of the interval Returns: int: Interval number. """ return int(time.mktime(ts.timetuple()) / duration)
A lot of verbiage for not much code! All we're doing here is converting a timestamp to a unix time - which is the number of seconds since the Unix Epoch - and then dividing it by the duration, giving us the number of 'intervals' since the epoch. Now for the second prerequisite: The 'flush' function:
@classmethod def flush_counter(cls, name): counter = cls.get_by_key_name(name) if not counter: counter = cls() # Get the current value value = memcache.get(name, cls.kind()) # Subtract it from the memcached value memcache.decr(name, value, cls.kind()) # Store it to the counter counter.count += value counter.put()
The observant amongst you may notice a couple of potential issues here. First up, we're not doing the update in a transaction. We're relying on the fact that this record is only updated in the task queue, and thus is unlikely to get clobbered by another concurrent update. It's also possible that the memcache.decr could succeed, but the counter.put() could fail, leading to under-counting. We've already stated we're willing to accept occasional under-counting, though, and this is a rare circumstance.
Finally, we can define the counter's increment method:
@classmethod def incr(cls, name, interval=5, value=1): """Increments the named counter. Args: name: The name of the counter. interval: How frequently to flush the counter to disk. value: The value to increment by. """ memcache.incr(name, value, cls.kind()) interval_num = get_interval_number(datetime.datetime.now(), interval) task_name = '-'.join([cls.kind(), name, interval, interval_num]) try: deferred.defer(cls.flush_counter, name, _name=task_name) except (taskqueue.TaskAlreadyExistsError, taskqueue.TombstonedTaskError): pass
And that's it! Note that we generate the name of the task from four things: The kind name, the name of the counter, the size of the interval, and the interval number. This keeps it unique, though you need to make sure to use the same interval each time you call incr, or your task can end up being updated more frequently than even the smaller of the intervals you use!
Previous Post Next Post