Search code examples
scaladistributedbarrier

barrier for multiple nodes


I'm trying to get the same functionality as a barrier in interprocess communication, but now for distributed nodes.

My problem is that I have multiple distributed processes over nodes. They have some non-deterministic setup, and after the setup I want them to start running at the same time.

Is there a simple framework which allows me to do this in Scala?


Solution

  • Using the Curator framework, and more specifically using the DistributedDoubleBarrier you can set a barrier on each node which needs to be synchronized. The principle behind this is that you can set the amount of nodes which need to enter the barrier, before they can continue.

    Assuming you have a (plain) zookeeper instance on example-zk:2128 you can use the following construct on all nodes:

    package test.barrier;
    
    //import
    import org.apache.curator.*;
    import org.apache.curator.framework.recipes.barriers.*;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.curator.framework.CuratorFramework;
    
    class App {
            public App()
            {
                    String zookeeperConnectionString = "example-zk:2181";
                    String barrierPath = "/unique-string";
    
                    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
                    CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
                    client.start();
    
                    DistributedDoubleBarrier distDBarrier = new DistributedDoubleBarrier(client, barrierPath, 3);
    
                    try{
                            System.out.println("Waiting on barrier");
                            distDBarrier.enter();
                            System.out.println("Apparently everybody was aboard! Continuing ...");
                    }
                    catch(Exception e)
                    {
                            System.out.println("O no!: " + e.getMessage());
                    }
    
    
            }
    
            public static void main (String[] args){
                    new App();
            }   
    }
    

    Note that the barrier path is "used", and for each run-instance of your cluster, you probably will need to create some unique path string.