Search code examples
parallel-processingdistributed-computingpriority-queue

How to implement a distributed priority queue without using Zookeeper?


I want to implement a distributed priority queue without using Zookeeper?


Solution

  • If you know how to communicate between client and server (e.g. with TCP sockets) it should be straightforward. The server contains a thread safe implementation of the Priority Queue, hence providing an "interface". Clients connect to the server and uses this "interface".

    Server

    The server must provide a priority queue interface (i.e. supporting add, peek, poll, ...). Important is that these methods must be thread safe ! So we will use PriorityBlockingQueue (which is synchronized) instead of PriorityQueue.

    public class Server {
        private static ServerSocket server_skt;
        public PriorityBlockingQueue<Integer> pq;
    
        // Constructor
        Server(int port, int pq_size) {
            server_skt = new ServerSocket(port);
            this.pq = new PriorityBlockingQueue<Integer>(pq_size);
        }
    
        public static void main(String argv[]) {
            Server server = new Server(5555, 20); // Make server instance
    
            while(true) {
                // Always wait for new clients to connect
                try {
                    System.out.println("Waiting for a client to connect...");
                    // Spawn new thread for communication with client 
                    new CommunicationThread(server_skt.accept(), server.pq).start();
                } catch(IOException e) {
                    System.out.println("Exception occured :" + e.getStackTrace());
                }
            }
        }
    }
    

    And this is how CommunicationThread class would look like

    public class CommunicationThread extends Thread {
        private Socket client_socket;
        private InputStream client_in;
        private OutputStream client_out;
        private PriorityBlockingQueue<Integer> pq;
    
        public CommunicationThread(Socket socket, PriorityBlockingQueue<Integer> pq) {
            try {
                this.client_socket = socket;
                this.client_in = client_socket.getInputStream();
                this.client_out = client_socket.getOutputStream(); 
                this.pq = pq;
    
                System.out.println("Client connected : " + client_socket.getInetAddress().toString());
            } catch(IOException e) {
                System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
            }
        }
    
        @Override
        public void run() {
            boolean active = true;
            while(active) {
                int message_number = client_in.read(); // Listen for next integer --> dispatch to correct method
    
                switch(message_number) {
                case -1: case 0: 
                    // Will stop the communication between client and server
                    active = false;
                    break;
                case 1:
                    // Add
                    int element_to_add = client_in.read(); // read element to add to the priority queue
                    pq.add(element_to_add); // Note that a real implementation would send the answer back to the client
                    break;
                case 2:
                    // Poll (no extra argument to read)
                    int res = pq.poll();
    
                    // Write result to client
                    client_out.write(res);
                    client_out.flush();
                    break;
    
                /*
                 * IMPLEMENT REST OF INTERFACE (don't worry about synchronization, PriorityBlockingQueue methods are already thread safe)
                 */
    
                }
            }
    
            client_in.close();
            client_out.close();
        }
    }
    

    This class is listening to what the client is sending. According to what the client sent, the server knows what to do, hence there is a mini protocol. That protocol is : when the client wants to invoke a method of the distributed priority queue, he sends an integer (e.g. 2 = poll()). The server reads that integer and knows which method to invoke.

    Note that sometimes sending one integer is enough (see poll() example), but not always. Think for example of add() which has to specify an argument. The server will receive 1 from the client (i.e. add()) and will read a second integer (or any other object that has to be stored in the distributed priority queue).

    Client

    Based on the protocol, the server is offering the client an interface (e.g. 0 = stop communication, 1 = add() , ...). The client only has to connect to the server and send messages (respecting the procotol!) to it.

    A client example :

    public class PQ_Client {
        private static Socket skt;
        private InputStream in;
        private OutputStream out;
    
        private final int _STOP_ = 0, _ADD_ = 1, _POLL_ = 2; // By convention (protocol)
    
        PQ_Client(String ip, int port) {
            try {
                this.skt = new Socket(ip, port);
                this.in = skt.getInputStream();
                this.out = skt.getOutputStream();
    
                System.out.println("Connected to distributed priority queue.");
            } catch(IOException e) {
                System.out.println("Could not connect with the distributed priority queue : " + e.getStackTrace());
            }
        }
    
        // Sort of stub functions
        public void stop() {
            out.write(_STOP_);
            out.flush();
            out.close();
        }
    
        public void add(Integer el) {
            out.write(_ADD_); // Send wanted operation
            out.write(el);    // Send argument element
    
            // Real implementation would listen for result here
    
            out.flush();
        }
    
        public int poll() {
            out.write(_POLL_);
            out.flush();
    
            // Listen for answer
            return in.read();
        }
    
        /*
         * Rest of implementation
         */
    }
    

    Note that thanks to these self made "stub functions" we can make a PQ_Client object and use it as if it was a priority queue (the client/server communication is hidden behind the stubs).

    String ip = "...";
    int port = 5555;
    
    PQ_Client pq = new PQ_Client(ip , port);    
    pq.add(5);
    pq.add(2);
    pq.add(4);
    
    int res = pq.poll();
    

    Note that by using RPC (Remote Procedure Call) it could be easier (stub function generated automatically, ...). In fact what we implemented above is a little RPC-like mechanism, as it does nothing else then sending a message to call a procedure (e.g. add()) on the server, serializing the result (not needed for integers), send it back to the client.