Using the Channel API on App Engine for instant traffic analysis

In last week's post, we introduced Clio, a system for getting live insight into your site's traffic patterns, and we described how the Prospective Search API lets us filter the site's traffic to get just the records we care about.

This week, we'll cover the other part of the system: delivering results in real-time. For this, we'll be using the Channel API to stream new log entries to admin users in real-time. As with last week's post, where there's differences between our demo implementation and what you'd use in a real-world system, I'll point those out.

The admin interface

First up, we need to provide a simple admin interface to which we'll stream results. Here's the handler for that:

class IndexHandler(webapp.RequestHandler):
  """Serve up the Clio admin interface."""

  def get(self):
    client_id = os.urandom(16).encode('hex')
    channel_key = channel.create_channel(client_id)
    template_path = os.path.join(os.path.dirname(__file__),
                                 'templates', 'index.html')
    self.response.out.write(template.render(template_path, {
        'config': config,
        'client_id': client_id,
        'channel_key': channel_key,
    }))

The only thing of significance we do here relates to the Channel API. First, we generate a random client ID by getting some random data and hex-encoding it. We pass that to the Channel API's channel.create_channel function to create a new channel, and are given back the channel_key, a unique value that lets our client connect to the channel. Then, we render a standard template, passing in those values (along with some site-wide config information).

If this were a more complete project, we'd likely not take this approach, and instead have the page make an AJAX call back to the server to request a channel key. That way, when the channel expires after 2 hours, it can request a new one and keep on seamlessly serving results, rather than timing out and having to be reloaded (and requiring the user to re-add all his subscriptions).

The admin interface, while straightforward, has a reasonable amount of Javascript code. Let's have a look at the basic page layout first, then we'll examine the Javascript. Here's the bulk of the page:

{% extends "base.html" %}
{% block title %}Clio{% endblock %}
{% block head %}
  <script type="text/javascript" src="/_ah/channel/jsapi"></script>
  <script type="text/javascript" src="https://ajax.googleapis.com/ajax/libs/jquery/1.6.1/jquery.min.js"></script>
  <style type="text/css">
    tr {
      border-bottom: 1px solid black;
    }
  </style>
{% endblock %}
{% block body %}
  <h1>Clio Console</h1>
  <table>
    <thead>
      <tr><th>Method</th><th>Path</th><th>Status Code</th><th>Wall Time</th><th>CPU Time</th></tr>
    </thead>
    <tbody id="results">
    </tbody>
  </table>
  </table>
  <div id="querybox">
    Enter query:
    <input type="text" id="query" />
    <input type="button" id="querybutton" value="Submit" />
  </div>
{% endblock %}

We include two Javascript snippets here. The first is the Channel API, found in /_ah/channel/jsapi. The second is JQuery, which will allow us to write much neater, cleaner javascript than would be the case if we didn't have it available. The rest of the page is a pretty bare skeleton: We define a header, an empty table that will be filled with log entries, and a very simple form for sending new queries. This form submits to the SubscribeHandler, which we covered in last week's post.

Let's take a look at the javascript that makes the admin interface do its thing. First up, we define a few variables we'll need:

  <script type="text/javascript">
    client_id = '{{client_id}}';
    channel_key = '{{channel_key}}';
    subscriptions = [];
    columns = ['method', 'path', 'status_code', 'wall_time', 'cpu_time'];

client_id and channel_key are both replaced by the template engine with the actual client id and channel key we supplied. subscriptions will be an array of subscription IDs; we don't actually need to do anything with this, but a more sophisticated implementation would track these so users can remove existing subscriptions, as well as categorize incoming results by the subscriptions they matched. Finally, we define a list of columns present in our table so we can easily generate markup for it.

    function add_message(message) {
      var row = $('<tr />');
      $('<td />', {
        'colspan': columns.length,
        'text': message,
      }).appendTo(row);
      row.appendTo('#results');
    }

    $(document).ready(function () {
      channel = new goog.appengine.Channel(channel_key);
      socket = channel.open();
      socket.onopen = function() {
        add_message('Channel established.');
      };
      socket.onmessage = function(message) {
        var data = jQuery.parseJSON(message.data)
        var row = $('<tr />');
        for(var i = 0; i < columns.length; i++) {
          $('<td />', {
            'text': data.data[columns[i]],
          }).appendTo(row);
        }
        row.appendTo('#results');
      };
      socket.onerror = function(error) {
        add_message('Channel error: ' + error.description);
      };
      socket.onclose = function() {
        add_message('Channel closed.');
      };

Here's where we do most of the work, but it should still be fairly easy to understand. We define a utility function, add_message, which allows us to add informative messages to the table we defined. We call it from the socket.onopen, socket.onclose and socket.onerror events to keep the user informed of these conditions. The socket.onmessage event handles incoming communications from the Channel API, and converts each message to a new table row using JQuery's excellent DOM generation support, adding it to the results table. We already covered in last week's post how matched results are handled with the MatchHandler, which then calls channel.send_message - messages sent there are received directly by the onmessage handler.

Finally, here's the code that handles clicks on the 'new query' button:

      $("#querybutton").click(function(event) {
        var subdata = {
          'query': $("#query").val(),
          'client_id': client_id,
        };
        $.post('{{config.BASE_URL}}/subscribe', subdata, function(data) {
          subscriptions.push(data);
          add_message('Subscription added with ID ' + data);
        });
      });
    });
  </script>

All we do here is construct a new subscription request, consisting of the query and our client ID, and send it to the SubscriptionHandler. When we get a response, we log this as an informative message to the results table.

That, surprisingly, is all we have to do to provide the admin interface of our traffic inspector. All the major parts are in place now: The middleware that intercepts requests and logs them to the Prospective Search API, the subscription handler to put new subscriptions in place to match results, the admin interface which establishes a channel, and the match handler that sends matched results to the client.

There's one final piece of cleanup we should do, however: Whenever a client disconnects from the Channel API, we should delete any subscriptions it has, so they don't sit around cluttering up the datastore. Although the matcher subscription will eventually expire, we should delete that too, to save on resources. Here's the code that does that, given a client ID:

def handle_disconnection(client_id):
  """Handles a channel disconnection for a Clio channel."""
  # Find all their subscriptions and delete them.
  q = model.Subscription.all().filter('client_id =', client_id)
  subscriptions = q.fetch(1000)
  for sub in subscriptions:
    prospective_search.unsubscribe(model.RequestRecord, str(sub.key()))
  db.delete(subscriptions)

Note we unsubscribe individually from each subscription, since there's no 'bulk unsubscribe' option for the Prospective Search API, but we delete all the subscriptions from the datastore in a single batch, to cut down on RPCs.

This function needs to be called from somewhere, of course, and the answer to that is the new connection and disconnection notification support in the Channel API. First, we define a new incoming service in app.yaml:

inbound_services:
- channel_presence

Once we've done that, we'll get channel connection and disconnection notifications on /_ah/channel/connected/ and /_ah/channel/disconnected/ respectively. Since these are app-wide, and an application might use the channel API for more than just Clio, we've provided the above function for another handler to call. We'll also define our own implementations as part of Clio that can be used directly if you're not using the channel API, for convenience:

class ChannelConnectHandler(webapp.RequestHandler):
  def post(self):
    pass


class ChannelDisconnectHandler(webapp.RequestHandler):
  def post(self):
    handle_disconnection(self.request.get('from'))

Using these requires us to add another mapping to app.yaml, since Clio currently only handles requests to /_clio/.*:

- url: /_ah/channel/.*
  script: clio/handler.py

A more sophisticated implementation might take advantage of the fact that we're already intercepting requests to most handlers via middleware to avoid the need to manually hook this component up.

And with that, we're done - we have a complete, if rather basic, system for monitoring site traffic in realtime. The source, as before, can be found here, though it would require some expansion in order to be useful in a real production environment, starting with a better and more flexible UI. Anyone keen? ;)

Comments

blog comments powered by Disqus