Consuming RSS feeds with PubSubHubbub

Frequently, it's necessary or useful to consume an Atom or RSS feed provided by another application. Doing so, though, is rarely as simple as it seems: To do so robustly, you have to worry about polling frequency, downtime, badly formed feeds, multiple formats, timeouts, determining which items are new and other such issues, all of which distract from your original, seemingly simple goal of retrieving new updates from an Atom feed. You're not alone, either: Everyone ends up dealing with the same set of issues, and solving them in more or less the same manner. Wouldn't it be nice if there was a way to let someone else take care of all this hassle?

As you've no doubt guessed, I'm about to tell you that there is. I'm speaking, of course, of PubSubHubbub. I discussed publishing to PubSubHubbub as part of the Blogging on App Engine series, but I haven't previously discussed what's required to act as a subscriber. Today, we'll cover the basics of PubSubHubbub subscriptions, and how you can use them to outsource all the usual issues consuming feeds.

At this point, you may be wondering how this is useful if the feed you're consuming doesn't support PubSubHubbub. Fortunately, the PubSubHubbub protocol provides for the possibility of hubs doing polling on feeds that do not support PubSubHubbub themselves. The public hub on http://pubsubhubbub.appspot.com/ doesn't currently have this enabled, but there are plenty of alternatives. First and foremost, you can run your own hub. The reference implementation is an App Engine app, so you can deploy it the same way you do your regular app. You can even deploy the hub as an alternate version under the same App ID, providing you with a 'private' hub that you can access the same way you would any other hub.

An easier alternative, however, is to use a hub provider that already supports polling. One such provider is superfeedr, who provide services for both publishers and subscribers. They're a commercial outfit, but they offer a "hackr plan", which is free if you monitor fewer than 1000 feeds - and their rates seem very reasonable. For simplicity, we'll be demonstrating subscriptions using their service, but the rest of the article applies equally to any other hub.

First, sign up to superfeedr. Once you've signed up and verified your account, you're ready to go!

Subscribing to a feed using hubbub is a three stage process:

  1. Send a subscription request to the hub
  2. Handle the subscription callback
  3. Process notifications

We'll go over each of these steps, using an example app that allows users to receive notifications of new posts over XMPP. First, we need to define models to keep track of subscriptions and individual subscribers:

class Subscription(db.Model):
  @property
  def url(self):
    return self.key().name()

  verify_token = db.StringProperty(required=True)  # Random verification token.

class Subscriber(db.Model):
  @property
  def subscription(self):
    return self.parent
  
  @property
  def address(self):
    return self.key().name()

We're making heavy use of entity relationships and key names here. To enforce uniqueness, the key name of a Subscription entity is the URL of its feed, and Subscriber entities are child entities of Subscriptions, with their key name being the XMPP address of the subscriber.

Sending subscription requests

Now we can handle the first part of subscribing to a feed: Sending the request to the hub. Doing so is a straightforward matter of sending an HTTP POST request to the correct URL, as detailed in the hubbub spec. We'll do so when a user asks to be subscribed, using an XMPP Handler:

class XmppHandler(xmpp_handlers.CommandHandler):
  def send_subscription_request(self, subscription):
    subscribe_args = {
        'hub.callback': urlparse.urljoin(self.request.url, '/hubbub'),
        'hub.mode': 'subscribe',
        'hub.topic': subscription.url,
        'hub.verify': 'async',
        'hub.verify_token': subscription.verify_token,
    }
 
    headers = {}
 
    if HUB_CREDENTIALS:
      auth_string = "Basic " + base64.b64encode("%s:%s" % HUB_CREDENTIALS)
      headers['Authorization'] = auth_string
 
    response = urlfetch.fetch(HUB_URL, payload=urllib.urlencode(subscribe_args),
                              method=urlfetch.POST, headers=headers)
 
 
  def subscribe_command(self, message):
    if not message.arg.startswith("http"):
      message.reply("Subscription requests must consist of a URL to subscribe to")
      return
 
    created, subscription, subscriber = db.run_in_transaction(
        add_subscription,
        message.arg,  # URL to subscribe to
        message.sender,  # User who is subscribing
    )
 
    if created:
      self.send_subscription_request(subscription)
    
    message.reply("Subscription created!")

When a user sends a message starting with '/subscribe', the 'subscribe_command' method is called. After doing some basic verification, it calls 'add_subscription' inside a datastore transaction, which returns the subscription and subscriber entities. This is necessary to make sure we don't subscribe to the same feed multiple times. Here's the code for add_subscription:

def add_subscription(topic, recipient):
  created = False
  subscription = Subscription.get_by_key_name(topic)
  if not subscription:
    created = True
    subscription = Subscription(key_name=topic, verify_token=str(uuid.uuid4()))
  subscriber = Subscriber(key_name=recipient, parent=subscription)
  db.put([subscription, subscriber])
  return created, subscription, subscriber

If this user is the first to subscribe to this feed, the send_subscription_request method is called. This constructs a dictionary of arguments for the subscription request, consisting of the URL to send callbacks and updated entries to, the mode ('subscribe'), the topic we're subscribing to, and a couple of verification arguments - 'hub.verify' and 'hub.verify_token'. The first one specifies that we're happy to handle the verification callback after the current request has completed, and the second argument provides a secret token that only we and the hub know of. This is to make it impossible for other people to subscribe us to a feed without our permission, as we'll see shortly.

After assembling the dictionary of subscription arguments, we deal with authorization. Public hubs, like http://pubsubhubbub.appspot.com/ don't require any authentication, but other providers, such as superfeedr, do. If we provided credentials (a (username, password) tuple in the HUB_CREDENTIALS) variable), we add those to the request. Finally, we make the subscription request using urlfetch.

Handling subscription callbacks

Part 2 is handling the subscription callback from the hub. The hub does this to make sure that nobody else forged the subscription request, and to make sure that we are operating a valid endpoint. This is where the verify_token parameter from above comes in: When we receive a subscription callback, we should check that the hub.verify_token argument the hub is supplying matches the one we stored when we made the request. If it does, we respond to the request by echoing back the 'hub.challenge' string it sends us, to confirm that we really want to subscribe. Here's how we handle it in our app:

class CallbackHandler(webapp.RequestHandler):
  def get(self):
    if self.request.GET['hub.mode'] == 'unsubscribe':
      self.response.headers['Content-Type'] = 'text/plain'
      self.response.out.write(self.request.GET['hub.challenge'])
      return
      
    if self.request.GET['hub.mode'] != 'subscribe':
      self.error(400)
      return
 
    subscription = Subscription.get_by_key_name(self.request.GET['hub.topic'])
    if not subscription or subscription.verify_token != self.request.GET['hub.verify_token']:
      self.error(400)
      return
 
    self.response.headers['Content-Type'] = 'text/plain'
    self.response.out.write(self.request.GET['hub.challenge'])

As you can see, this is very straightforward: We check that it's a subscription request ('hub.mode' is 'subscribe'), then we fetch the subscription and check that the tokens match. If all is well, we echo back the challenge string in the response, which is how hubbub verifies that we're okay with the subscription request.

Processing updates

Now that the subscription process is out of the way, we can handle the updates themselves. For this, we'll use the Universal Feed Parser library, though since the hub processes and sanitizes the feed, we could just as easily use a standard XML parser. Since new entries are sent as a POST request to the same URL as the subscription callback, we add a post() method to our CallbackHandler:

  def post(self):
    """Handles new content notifications."""
    feed = feedparser.parse(self.request.body)
    id = find_self_url(feed.feed.links)
    subscription = Subscription.get_by_key_name(id)
    subscriber_keys = Subscriber.all(keys_only=True).ancestor(subscription).fetch(1000)
    subscriber_addresses = [x.name() for x in subscriber_keys]
    if not subscription:
      logging.warn("Discarding update from unknown feed '%s'", id)
      return
    for entry in feed.entries:
      message = "%s (%s)" % (entry.title, entry.link)
      xmpp.send_message(subscriber_addresses, message)

def find_self_url(links):
  for link in links:
    if link.rel == 'self':
      return link.href
  return None

Here, we parse the request body with UFP, and extract the feed's 'self' URL using a convenience method. We then use that 'self' URL to retrieve the Subscription entity, and for each item in the feed, we notify all the subscribers of the update. Note that because we store the subscribers' XMPP addresses as key names, we don't need to fetch the Subscriber entities themselves - just their keys.

That's it - you now never have to worry about polling intervals, sanitization, or unavailable feeds again! The full source for the example app in this post is here, and you can try it out by messaging xmpphubbub@appspot.com. A few caveats before you go, though:

  • There's no unsubscribe in the example - so be careful what you subscribe to!
  • Real code would have more error checking, such as verifying that the response to the subscribe request was a 2xx, that the callback is made (sooner or later).
  • Superfeedr doesn't support automatic subscription renewal - so if you want to know for certain that it hasn't forgotten about a subscription, and you haven't heard from it in a bit, you'd better re-subscribe.
  • Superfeedr also doesn't support "authenticated content distribution", a mode that uses a shared secret to generate an HMAC signature for updates. In my mind, this is a major omisson - because it means that anyone who knows your callback URL can invent RSS updates at will! Edit: superfeedr does support authenticated content distribution.

Those caveats aside, I'm confident that if you compare this solution to implementing your own polling infrastructure, you'll find that it comes out significantly simpler. Plus, as soon as the publishers of your feeds start using Hubbub, you'll get instant updates!

Comments

blog comments powered by Disqus