Search code examples
distributed-computingconsistent-hashing

Ketama with secondary and primary node


I have a number of keys I need to store data for across many machines. I've got this working using a Ketama consistent hashing library, but for every key I'd like 2 machines to store that key's data, a primary and a secondary.

I can imagine going counter-clockwise instead of clockwise (e.g. floorEntry instead of ceilingEntry) to find the secondary machine, but that would require a change in the library which I did not author.

Is there a way to achieve this without lib modifications? One idea is to rotate the hash around the ring 180 "degrees", but unsure how to do that.

Bonus/optional: how to find a tertiary machine in addition to primary and secondary for a given key?


Solution

  • Found a simple solution from a paper on distributed key-stores, The PRO key-value store .

    When a key-value pair is stored, the next server clockwise from (greater or equal to) the hash value is the primary node, while the next distinct node (successor) is the secondary.

    I figure out the "next" node by keeping an index of primary->secondary nodes. Support for n backup nodes would be as simple as building a map of Node->List[Node] in a similar fashion. In Scala, using Twitter's KetamaDistributor, this could look like:

    import Partitioner._
    
    case class Partition(page: String, primary: String, secondary: String)
    
    class Partitioner(pagesIds: Seq[String], nodes: SortedSet[String]) {
    
      val ketamaNodes = nodes.map { host => KetamaNode(host, defaultNodeWeight, host) }
      val ketamaDistributor = new KetamaDistributor(ketamaNodes, numReps)
    
      // Build a map of primary->secondary nodes
      val nodeIndex: Map[String, String] = nodes.sliding(2).foldLeft(Map[String, String]()) {
        case (acc, Vector(x,y)) => acc.updated(x, y)
      } ++ Map(nodes.last -> nodes.head)
    
      def partitions = {
        pages.map { page =>
          val hash = KeyHasher.KETAMA.hashKey(page)
          val primary = ketamaDistributor.nodeForHash(hash)
          Partition(page, primary, nodeIndex(primary)
        }
      }
    
    }
    
    object Partitioner {
      val numReps = 160
      val defaultNodeWeight = 100
    }
    

    Usage looks like:

    def uuid = java.util.UUID.randomUUID.toString
    val nodes = (1 to 6).map { i => new Backend(s"machine-$i") }
    val pages = (1 to 100).map { _ => uuid }
    val partitioner = new Partitioner(pages, nodes)
    val partitions = partitioner.partitions
    // find the primary and secondary server for a given page
    partitions(page.head)
    // => Partition(f7eba506-e366-4cf3-ad72-4992fc5431b0,machine-5,machine-6)