Search code examples
javamultithreadingsynchronizationblockingqueue

Wait till a Blocking Queue is full


Im looking for a way to synchronize multiple asynchronous operations. I'd like to use a BlockingQueue with a size equal to my operations but who can i wait till the Queue is full?

Im looking for something like a reversed Blocking Queue.

I need to gather the Results of each Thread at the End. The AsyncHandler is fixed, its already a ThreadExecutor underlying, i cannot start new Threads.

//3 Times
makeAsync(new AsyncHandler() {
   onSuccess() {
     ..
     queue.put(result)
   }

   onFailure() {
     ..
   }
});

//Blocking till the Queue is full
List<Results> = queue.takeAll()

Bonus Question: I need a way to end the wait when one of my Requests fails


Solution

  • What you describe with

    //Blocking till the Queue is full
    List<Results> results = queue.takeAll();
    

    does not differ semantically from “take as much items as the queue’s capacity”. If you know the capacity you can achieve this by:

    // preferably a constant which you also use to construct the bounded queue
    int capacity;
    

    List<Results> results = new ArrayList<>(capacity);
    queue.drainTo(results, capacity);
    while(result.size()<capacity)
        queue.drainTo(results, capacity-result.size());
    

    This will block until it has received as much items as the capacity which is, as said, the same as waiting for the queue to become full (has a size equal to its capacity) and than take all items. The only difference is that the event of the queue becoming full is not guaranteed to happen, e.g. if you intend your async operations to offer items until the queue is full, it does not work this way.

    If you don’t know the capacity, you are out of luck. There is not even a guaranty that an arbitrary BlockingQueue is bounded, read, it might have an unlimited capacity.

    On the other hand, if the asynchronous operations are able to detect when they have finished, they could simply collect the items in a list locally and put the entire list into a BlockingQueue<List<Results>> as a single item once they are done. Then your code waiting for it needs only a single take to get the entire list.