Exploring the new mapper API

One of the new features announced at this year's Google I/O is the new mapper library. This library makes it easy to perform bulk operations on your data, such as updating it, deleting it, or transforming/filtering/processing it in some fashion, using the 'map' (and soon, 'reduce') pattern. I'm happy to say that I'm deprecating my own bulkupdate library in favor of it.

The mapper API isn't just limited to mapping over datastore entities, either. You can map over lines in a text file in the blobstore, or over the contents of a zip file in the blobstore. It's even possible to write your own data sources - something we'll cover in a later post. Today, though, I'd like to dissect the demo that was presented at I/O. The demo uses a number of the mapper framework's more sophisticated features, so it's a good one to use to get an idea for how the framework works.

For basic usage, the Getting Started page in the mapper docs is the place to go. If you're interested in seeing something more complex in practice, read on...

The demo at I/O consisted of a "code search" system, whereby you could upload a zipfile containing source code (or other text files) to the blobstore, then find every line matching a regular expression inside that file. Without further ado, let's see the mapper function that demo used:

def codesearch((file_info, reader)):
  """Searches files in a zip for matches against a regular expression."""
  params = context.get().mapreduce_spec.mapper.params
  regex = re.compile(params["regex"])
  parent_key = db.Key(params["parent_key"])
  
  if file_info.file_size == 0:
    return

  results = model.CodesearchResults(
      parent=parent_key, filename=file_info.filename)
  file_data = reader()
  for line_no, line in enumerate(file_data.split('\n')):
    if regex.search(line):
      results.match_lines.append(line_no)
      results.matches.append(line)
  
  if results.matches:
    yield op.db.Put(results)

This is our mapper function. It gets called once for each entry in the zip file that's being processed, and is passed a single argument. When iterating over a zipfile, that argument is a tuple, consisting of a zipfile.ZipInfo object providing information about the entry, and a zero-argument function (here, called 'reader') that when called will return the entire contents of the stored file. The reason it doesn't simply pass in the contents of the entry directly is for efficiency: If you're writing a mapper that only needs to process some files in a zip, there's no point wasting time reading them all out and passing them to the mapper, just to be discarded.

The first thing our function does is get the user-defined parameters. This is a dict of values provided by the user (and as we'll see later, optionally by our code) when they started the mapreduce process. We then extract 'regex', the regular expression to search on, and 'parent_key'. parent_key provides the key of the entity under which we should create all our result entities*.

Next up, we check the length of the file. If it's empty, we skip it. The main reason for this check is that directories are also zip entries, and this is the easiest way to skip over directories, since we don't care about directories or empty files.

Now we do the real work. First, we create a results entity under the parent key we retrieved earlier. Then, we call the passed-in reader function to retrieve the contents of the file, and we iterate over each line of it. Inside the for loop, we apply the regular expression to each line, and if it matches, we update the result entity with the line the match occurred in, and the contents of that line.

You'll note that we're compiling the regular expression afresh with each call to the mapper function. This seems inefficient, but here we're relying on a feature of the Python regular expression library: The regular expression cache. It transpires that Python caches regular expressions, so that if you call compile on the same regex twice, it will use the cached copy the second time. Since the mapper function will be called over and over again, we're able to use this to good effect.

Finally, we check if any lines matched in the file. If they did, we yield a 'Put' operation, instructing the mapper framework to write the new result entity to the datastore. If there were no matches in this file, we don't make the call, so the entity never gets written.

That's all there is to the mapper function, but there's still a little more we need to do. You'll note that we accepted as one of our parameters a 'parent_key'. It's not particularly friendly to require users to create the parent entity and supply its key, though - it'd be far more helpful if we could allow them to supply the ID of a file in the datastore, and figure out the parent key ourselves. Well, it turns out that's possible, by using a 'validator' function.

Validator functions get called once before the mapreduce starts, and are given the set of user-supplied parameters for the mapreduce, and allowed the opportunity to validate them (what a surprise, eh?) and modify them if they wish. Here's ours:

def codesearch_validator(user_params):
  """Validates and extends parameters for a codesearch MR."""
  file = model.ZipFile.get_by_id(int(user_params["file_id"]))
  user_params["blob_key"] = str(file.blob.key())

  parent_model = model.CodesearchJob(
    name=user_params["job_name"],
    file=file,
    regex=user_params["regex"])
  parent_model.put()
  user_params["parent_key"] = str(parent_model.key())

  return True

In this case, we expect the user to supply a 'file_id' parameter, which should be the id of an entity in our datastore with information about an uploaded file. From that, we extract the key of the blob to process, which allows our user to avoid having to enter that information by hand, as well. Then, we construct a new CodesearchJob entity, which will act as the parent entity we referred to above, and add its key as the 'parent_key' parameter. Finally, we return True to indicate that validation succeeded.

The one remaining components is the definition of the mapreduce. Here it is, from mapreduce.yaml:

mapreduce:
- name: Codesearch
  mapper:
    input_reader: mapreduce.input_readers.BlobstoreZipInputReader
    handler: mr.codesearch
    params_validator: mr.codesearch_validator
    params:
    - name: file_id
    - name: regex
    - name: job_name
    - name: processing_rate
      default: 10000
    - name: shard_count
      default: 20

None of this should be unexpected at this point. We give our mapreduce a name, and specify BlobstoreZipInputReader as the input reader class to use. For the handler, we provide the fully qualified name of the handler function we defined above, and likewise for the params_validator, we supply the fully qualified name of our validator function. Finally, we define a set of parameters that may be provided by the user when they create the mapreduce: file_id, regex, job_name, processing_rate, and shard_count.

The eagle-eyed amongst you may have noticed that the last two parameters appear nowhere in our code. That's because they're used by the mapper framework to decide how many shards to start up, and how fast to process entities. There are several parameters of this type -we already saw another one in the validator function, 'blob_key', which is used by the input reader.

That's all for today. In a future post, we'll take a look inside the BlobstoreZipInputReader, and discuss how to write your own input reader class for the mapper framework.

Comments

blog comments powered by Disqus