Search code examples
asynchronouswebsocketipcmessage-queuedistributed-computing

Asynchronous web socket application server with two event loops


I'm trying to make a distributed RPC-type web application that uses websockets for its main interface. I want to use a queuing system (like RabbitMQ) in order to distribute the expensive jobs that are requested through the websocket connections.

Basically, the flow would go like this:

  1. A client sends a job via websocket connection to the server
  2. The server would send this message to a RabbitMQ exchange to be processed by a worker
  3. The worker would execute the job and add the result of the job to a response queue
  4. The server would check the response queue and send the result of the job back to the client via websocket connection.

As far as I can tell, on the server I need two event loops that share memory. The websocket server needs to be listening for incoming jobs, and a RabbitMQ consumer needs to be listening for job results to send back to the clients.

What's the appropriate technologies for me to use here? I've considered the following:

  • multithreading the application and starting one event loop on each thread
  • using two processes with shm (shared memory)
  • using two processes that communicate via socket (either a unix socket or maybe even set up the workers as special websocket clients)
  • hooking into the websocket server's event loop to check the result queue

I'm new to both websockets and distributed computing, so I really have no idea which of these (or maybe something I didn't think of) would work best for me.


Solution

  • As far as I can tell, on the server I need two event loops that share memory. The websocket server needs to be listening for incoming jobs, and a RabbitMQ consumer needs to be listening for job results to send back to the clients.

    Since you can have multiple clients sending jobs concurrently, you will need a multithreaded server. Unless your application would process client per client. Now there are multiple approaches to implement a multithreaded server, each with their own advantages/disadvantages. Take a look at multithreading through :

    1. A thread per request (+ : throughput potentially maximized, - : threads are expensive to create, must manage concurrency)
    2. A thread per client (+ : less thread management overhead, - : doesn't scale to many many connections, still manage concurrency)
    3. A thread pool (+ : Avoids overhead of thread creation, scalable up to N concurrent connections (N = size of thread pool), - : Manage concurrency between N threads)

    It's up to you to choose one of the above approaches (I would opt for a thread per client as it is relatively easy to implement and the chance that you'll have tens of thousands of clients is relatively small).

    Notice that this is a multithreaded approach and not an event-driven approach ! But since you are not limited to one thread (in which case it should be event driven in order to be able to process multiple clients "concurrently") I wouldn't go for that option as it is more difficult to implement. (Programmers sometimes speak about a "callback hell" in an event-driven approach).

    This is how I would implement it (one thread per client, Java) :

    Basically, the flow would go like this:

    1. A client sends a job via websocket connection to the server

    Server part :

    public class Server {
        private static ServerSocket server_skt;
        private static ... channel; // channel to communicate with the rabbitMQ distributed priority queue.
    
        // Constructor
        Server(int port) {
            server_skt = new ServerSocket(port);
            
            /*
             * Set up connection with the distributed queue
             * channel = ...;
             */
        }
    
        public static void main(String argv[]) {
            Server server = new Server(5555); // Make server instance
    
            while(true) {
                // Always waiting for new clients to connect
                try {
                    System.out.println("Waiting for a client to connect...");
                    // Spawn new thread for communication with client (hence one thread per client approach)
                    new CommunicationThread(server_skt.accept(), server.channel).start(); // Will listen for new jobs and send them
                } catch(IOException e) {
                    System.out.println("Exception occured :" + e.getStackTrace());
                }
            }
        }
    }
    
    1. The server would send this message to a RabbitMQ exchange to be processed by a worker
    2. ...
    3. The server would check the response queue and send the result of the job back to the client via websocket connection.
    public class CommunicationThread extends Thread {
        private Socket client_socket;
        private InputStream client_in;
        private OutputStream client_out;
        private ... channel; // Channel to communicate with rabbitMQ
        private ... resultQueue;
    
        public CommunicationThread(Socket socket, ... channel) { // replace ... by type of the rabbitMQ channel
            try {
                this.client_socket = socket;
                this.client_in = client_socket.getInputStream();
                this.client_out = client_socket.getOutputStream(); 
                this.channel = channel;
                this.resultQueue = ...;
    
                System.out.println("Client connected : " + client_socket.getInetAddress().toString());
            } catch(IOException e) {
                System.out.println("Could not initialize communication properly. -- CommunicationThread.\n");
            }
        }
        
        public yourJobType readJob() {
            // Read input from client (e.g. read a String from "client_in")
            // Make a job from it (e.g. map String to a job)
            // return the job
        }
    
        @Override
        public void run() {
            while(active) {
                
                /*
                 * Always listen for incoming jobs (sent by client) and for results (to be sent back to client)
                 */
                
                // Read client input (only if available, else it would be blocking!)
                if(client_in.available() > 0) {
                    yourJobType job = readJob();
                    channel.basicPublish(...); // Send job to rabbitMQ
                }
                
                /* Check result queue (THIS is why reading client input MUST be NON-BLOCKING! Else while loop could be blocked on reading input
                 * and the result queue won't be checked until next job arrives)
                 */
                
                ResultType next_result = resultQueue.poll(); // Could be "null" if the queue is empty
                if(next_result != null) {
                    // There is a result
                    client_out.write(next_result.toByteArray());
                    client_out.flush();
                }
            }
            
            client_in.close();
            client_out.close();
        }
    }
    

    Note that when reading from the result queue it is important that you only read results of jobs sent by that client.

    If you have one result queue containing the results of jobs (of all clients) and you retrieve a result like in the code above, then that result could be the result of a job of another client, hence sending the result back the the wrong client.

    To fix this you could poll() the result queue with a filter and a wildcard (*) or have a result queue for each client, hence knowing that a result retrieved from our queue wil be sent to the corresponding client.

    (*) : You could assign an ID to every client. When receiving a job from a client, pair the job with the client ID (e.g. in a tuple < clientID, job >) and put it in the queue. And do the same for the results (pair the result with the client ID and put it in the result queue). Then in the run() method of CommunicationThread you would have to poll the result queue only for results of the form < clientID, ? >.

    Important : You'll also have to assign an ID for every job! Because sending job A and then job B doesn't guarantee that result of job A will come before the result of job B. (Job B could be less time consuming then job A and thus the result could be sent back to the client before job A's result).

    (PS : It's up to you to see how to implement the workers (executed by server with one thread for each worker? Or executed by other processes?))


    The above answer is a possible, multithreaded solution. I only discussed the server part, the client part should send jobs and wait for results (how to implement this depends on your goals, do clients first send all jobs and then receive the results of each job or can this be mixed ?).

    There are other ways it could be implemented, but for a beginner in distributed computing I think this is the easiest solution (using thread pools, ... would make it trickier).