Implementing a DHT in Go, part 2

In the previous post, we started a straightforward implementation of a Kademlia Distributed Hash Table in Go. Today, we'll add in the real meat - interaction between peers to find nodes.

First, though, a correction to the previous post. There were a couple of errors in it, the most notable of which is that the routing table's FindClosest method failed to order the returned results correctly. The original implementation ordered results by their absolute node ID, but the correct ordering is by their ID xor the target ID. To my shame, my rudimentary unit tests did not catch thils. This is now fixed in the original article with the introduction of a 'ContactRecord' struct.

Let's start by defining a Kademlia struct to hold information about a Kademlia network:

type Kademlia struct {
  routes *RoutingTable;
  NetworkId string;
}

func NewKademlia(self *Contact, networkId string) (ret *Kademlia) {
  ret = new(Kademlia);
  ret.routes = NewRoutingTable(self);
  ret.NetworkId = networkId;
  return;
}

Note the presence of the 'NetworkId' field in the above code. This is an arbitrary string that should be unique for each deployment of our Kademlia implementation, to prevent different instances of the network merging together.

Go supports RPC calls using its built in rpc package. RPCs are handled by exporting a struct; all public methods on that struct that meet certain requirements then become available for RPC calls. Go supports RPC calls over HTTP as well as over raw sockets; we'll use the former, as it's slightly easier to use. To start, we'll define a struct that will export the 'core' Kademlia RPCs, and a common header for RPC requests and responses:

type RPCHeader struct {
  Sender *Contact;
  NetworkId string;
}

func (k *Kademlia) HandleRPC(request, response *RPCHeader) os.Error {
  if request.NetworkId != k.NetworkId {
    return os.NewError(fmt.Sprintf("Expected network ID %s, got %s",
                                   k.NetworkId, request.NetworkId));
  }
  if request.Sender != nil {
    k.routes.Update(request.Sender);
  }
  response.Sender = &k.routes.node;
  return nil;
}

type KademliaCore struct {
  kad *Kademlia;
}

The HandleRPC method is a helper function that should be called for every incoming RPC. It takes care of checking that the network ID is correct, updating the routing table with the details of the peer that just sent us an RPC, and including our own details in the reply message. Let's see a trivial example of an RPC - the Ping RPC:

type PingRequest struct {
  RPCHeader;
}

type PingResponse struct {
  RPCHeader;
}

func (kc *KademliaCore) Ping(args *PingRequest, response *PingResponse) (err os.Error) {
  if err = kc.kad.HandleRPC(&args.RPCHeader, &response.RPCHeader); err == nil {
    log.Stderr("Ping from %s\n", args.RPCHeader);
  }
  return;
}

The Ping RPC does literally nothing, except provide an opportunity for peers to check that we're still alive, and update both our routing tables with that fact. Note the way we use Go's if statement in Ping: By using Go's support for an arbitrary statement in the if, followed by an expression, we can use the if as a guard clause that executes HandleRPC, and then executes the content of the block iff HandleRPC was successful. The use of a named return parameter means that if the call is unsuccessful, the error is automatically returned to the caller.

Registering the RPC server with Go works like this:

func (k *Kademlia) Serve() (err os.Error) {
  rpc.Register(&KademliaCore{k});

  rpc.HandleHTTP();
  if l, err := net.Listen("tcp", k.routes.node.address); err == nil {
    go http.Serve(l, nil);
  }
  return;
}

First we construct a new KademliaCore instance, and register it with the RPC module. This makes use of a couple of Go features I quite like. First, Go has good support for composite literals, so we can construct a struct using a single literal expression. Second, Go uses escape analysis to determine when something should be allocated on the heap instead of the stack. If we were to attempt to do what you see above in C, we'd be passing a pointer to a struct allocated on the stack, which would fail horribly when it was first accessed after our Serve function returned. Go detects this, however, and allocates our KademliaCore instance on the heap, making the above a much neater version of this:

kc := new(KademliaCore);
kc.kad = k;
rpc.Register(kc);

Then, we instruct the RPC module that we want it to handle RPC-over-HTTP calls. I'm not too happy about the design decisions here and with Register to use global state - it makes it impossible to have more than one RPC server exporting different sets of RPCs on different ports, for instance. Hopefully this will be fixed in future versions of the library.

Finally, we call net.Listen to listen on a TCP port, and call http.Serve on the new port in a goroutine, to handle incoming HTTP requests.

Let's examine a slightly less trivial RPC call, the FindNode RPC:

type FindNodeRequest struct {
  RPCHeader;
  target NodeID;
}

type FindNodeResponse struct {
  RPCHeader;
  contacts []Contact;
}

func (kc *KademliaCore) FindNode(args *FindNodeRequest, response *FindNodeResponse) (err os.Error) {
  if err = kc.kad.HandleRPC(&args.RPCHeader, &response.RPCHeader); err == nil {
    contacts := kc.kad.routes.FindClosest(args.target, BucketSize);
    response.contacts = make([]Contact, contacts.Len());

    for i := 0; i < contacts.Len(); i++ {
      response.contacts[i] = *contacts.At(i).(*ContactRecord).node;
    }
  }
  return;
}

This RPC is a wrapper around the local FindClosest call on the RoutingTable struct. After doing the standard HandleRPC call successfully, we call FindClosest, then iterate over its results, adding them to an array (actually a slice) of results. Again, the RPC module seamlessly handles serializing and deserializing the RPC and its arguments.

Let's look at how a client makes an RPC. To facilitate this, we can define a Call method on the Kademlia struct:

func (k *Kademlia) Call(contact *Contact, method string, args, reply interface{}) (err os.Error) {
  if client, err := rpc.DialHTTP("tcp", contact.address); err == nil {
    err = client.Call(method, args, reply);
    if err == nil {
      k.routes.Update(contact);
    }
  }
  return;
}

This method sends an RPC to a client by connecting to its address over HTTP with rpc.DialHTTP. This method returns client object, which we then call the Call method on, with the method name (a string in the form 'Struct.Method') and the args and reply structs. If the call was successful, we update the routing table.

I've been saving the hardest part for last, though. The most important part of Kademlia's design is how it finds nodes and data in the network. It does this using an iterative process, described here. In short, it starts off with a list of nodes closest to the target ID, retrieved from the local routing table. It repeatedly queries the closest few nodes that it hasn't yet queried, asking them for the closest nodes _they_ know about. When it receives a response, it updates its result list, and repeats. This procedure is done in parallel, with several RPCs in flight at the same time.

Typically, the procedure terminates when either an RPC call returns no new results, or every contact in the result set has been queried. In our case, we'll use a slightly simpler criteria: We'll stop only when we no longer have any unqueried nodes.

func (k *Kademlia) sendQuery(node *Contact, target NodeID, done chan []Contact) {
  args := FindNodeRequest{RPCHeader{&k.routes.node, k.NetworkId}, target};
  reply := FindNodeResponse{};
  
  if err := k.Call(node, "KademliaCore.FindNode", &args, &reply); err == nil {
    done <- reply.contacts;
  } else {
    done <- []Contact{};
  }
}

This is the sendQuery method. It takes care of sending a FindNode RPC to the specified contact, then, when it replies, sends the results - a list of contacts - to the channel provided to signal completion. The method's designed this way so that we can call sendQuery in a new goroutine, and it will signal us with its results when it is done.

func (k *Kademlia) IterativeFindNode(target NodeID, delta int) (ret *vector.Vector) {
  done := make(chan []Contact);

  // A vector of *ContactRecord structs
  ret = new(vector.Vector).Resize(0, BucketSize);
  
  // A heap of not-yet-queried *Contact structs
  frontier := new(vector.Vector).Resize(0, BucketSize);
  
  // A map of client values we've seen so far
  seen := make(map[string] bool);

This is the start of the IterativeFindNode function. You can see here where we declare the 'done' channel instances of sendQuery will use. We also initialize the return Vector, which will store ContactRecord pointers, another vector called 'frontier', which will be structured as a heap, and store Contact pointers for contacts we have not yet queried, and a map of Node IDs that we've already seen, to prevent us accidentally adding the same node to the return or frontier vectors multiple times. Keys to maps have to be one of several types that support equality comparisons in Go, so we'll serialize our NodeIDs to use as keys in this map.

  // Initialize the return list, frontier heap, and seen list with local nodes
  for node := range k.routes.FindClosest(target, delta).Iter() {
    record := node.(*ContactRecord);
    ret.Push(record);
    heap.Push(frontier, record.node);
    seen[record.node.id.String()] = true;
  }
  
  // Start off delta queries
  pending := 0;
  for i := 0; i < delta && frontier.Len() > 0; i++ {
    pending++;
    go k.sendQuery(frontier.Pop().(*Contact), target, done);
  }

Here, we ask the local routing table for the contacts it knows of that are closest to the target to start us off., and step through them. For each one, we add it to the return values, push it onto the heap, and mark it as seen. Then, we start off a set of concurrent RPCs using our sendQuery method. A counter variable, 'pending', keeps track of how many in-flight RPCs we have.

  // Iteratively look for closer nodes
  for pending > 0 {
    nodes := <-done;
    pending--;
    for _, node := range nodes {
      // If we haven't seen the node before, add it
      if _, ok := seen[node.id.String()]; ok == false {
        ret.Push(&ContactRecord{&node, node.id.Xor(target)});
        heap.Push(frontier, node);
        seen[node.id.String()] = true;
      }
    }
    
    for pending < delta && frontier.Len() > 0 {
      go k.sendQuery(frontier.Pop().(*Contact), target, done);
      pending++;
    }
  }

This is the core of our code. We loop as long as there are queries in-flight (Go has no 'while' loop - it uses a modified 'for' loop for everything), reading results from the done channel. This operation will block until one of our pending RPCs completes. When we have results, we iterate through them, adding any ones we haven't seen before to the return vector and the heap, and marking them as seen. Then, if there are nodes still in the frontier heap, we start off new RPCs to query them, up to our specified concurrency limit. This is in a loop, rather than just an if statement, because it's possible that while one RPC could return nothing new (resulting in no new RPCs being started), a subsequent RPC could return several new results.

An important caveat here is that we're not accounting for unresponsive nodes. If a node is queried and never responds, we'll wait here indefinitely. A real-world implementation would set a timeout for each RPC, and give up on outstanding RPCs if they take too long.

    sort.Sort(ret);
  if ret.Len() > BucketSize {
    ret.Cut(BucketSize, ret.Len());
  }

  return;
}

Finally, we wrap up by sorting the results and truncating them to the number expected.

That, in a nutshell, is the core functionality in a Kademlia implementation. All other operations build on the IterativeFindNode operation in one manner or another. In the next post, we'll demonstrate this by adding support for storing and retrieving values, making the implementation into a proper hash table.

One major caveat: Because I'm writing this on-the-fly, it's possible - nay, likely - that there are major bugs in the implementation so far. Unit testing can only catch so much, so don't be surprised if my next post once again contains an embarrassed admission that I got it wrong somehow. If you see a bug, shout out!

Comments

blog comments powered by Disqus