Search code examples
javajakarta-eejax-rsthread-synchronization

How do I handle multiple client requests which need to be combined to create a response?


I'm using JAX-RS to handle client requests. The scenario is this: clients are divided into groups of let's say 5. Each client needs to submit the server a number and when all 5 clients that belong to the same group finish submitting their number the server calculates the sum of these numbers and returns the result to all of them.

Because when a client submits a number they have to wait for the others to do so also (if they are not the last one) I'm using the asynchronous response API which allows to wait for the other clients without overloading the server. Let's say there's no timeout limit.

My lack of understanding of the inner workings of JAX-RS, sessions and contexts is making it difficult for me to design the proper solution to this. Here is what I have in mind (in hybrid code-pseudocode):

public class SummationResource {

    @Inject
    SummationService service; // talks with the entity manager

             //group    client  number
    static Map<Long, Map<Long, Integer>> groupMap;

    @POST
    @Path("{clientId}")
    public void submit(int num,
                        @PathParam("clientId") long clientId,
                        @Suspended AsyncResponse response) {
        new Thread() {
            @Override
            public void run() {
                long groupId = service.getGroupOfClient(clientId);
                Map<Long, Integer> clients = groupMap.get(groupId);
                clients.put(clientId, num);
                if (clients.size() != 5)
                    // wait (see points below)
                else
                    // release all threads of this group
                int sum = HelperClass.sum(clients.values()); // should be executed only once
                response.resume(sum);
            }
        }.start();
    }
}

The points I want to touch:

  • I know that I can store it in a DB instead of in memory but let's say the submit time between clients is short and that writing and reading from the DB is more expensive. Is groupMap different for each invocation of submit (by different clients)? is making it static good enough to have it shared across all instances?
  • For synchronizing the threads I can make a Map<Long, Object> locks which maps a groupID with a synchronized lock object. So each thread waits by calling locks.get(groupId).wait() and the release line is locks.get(groupId).notifyAll(). The lock object is created by the first thread that belongs to a group. I don't know if this approach is friendly to JAX-RS and AsyncResponse. Wait-notify is also not in use so much today, so any new API which works in this situation for this purpose is good. Perhaps ExecutorService and Future?
  • I'm not sure how to combine the numbers one time and return the result to all threads. Usually I would join them but I can't do it outside the thread because that will block the synchronous response which defeats the whole idea.

I'm looking for a design that addresses the requests and points above similar to what I did in my code.


Solution

    1. groupMap is static, so it will be the same instance for all calls, as long as the application lives. But there is not yet an instance created. Java offers synchronized collections, so you do not need to bother with synchronizing the access to it:

      static Map<Long, Map<Long, Integer>> groupMap = Collections.synchronizedMap( new HashMap<>());
      

    And you should populate it at least once with your clients (currently clients would be null).

    1. For the wait I recommend using CompletableFuture which can return a result of a self defined type, using one for each of your groupId:

      Map<Long, CompletableFuture> futures = Collections.synchronizedMap( new HashMap<>());
      

    Each thread which must wait ( clients.size() < 5) adds its own future to this map:

        CompletableFuture<Integer> future = new CompletableFuture<>();
        futures.put(clientId, future);
        int sum = future.get();
    

    The 5th thread which calculates the sum will complete all of these futures, passing the calculated sum.

        futures.values().forEach( i -> i.complete( sum));