Search code examples
javaspringmultithreadingspring-bootblockingqueue

Blocking Queue poll method retuns null during multiple threads


We have implemented multithreading in our application using blockingqueue, executor framework and future.

When ever user asks for some data, we submit a task to the executor framework which connects to database, queries data and streams the data back.

We have a method, Which reads this data and writes to servletoutputstrean.


public long writeData(ServletOutputStream sos, BlockingQueue < T > blockingQueue, Future < Boolean > future) {

    try (final JsonWriter writer = new JsonWriter(new OutputStreamWriter(sos, UTF_8))) {

        int counter = 0;
        while (true) {
            Optional.ofNullable(blockingqueue.poll()).ifPresent(entityobj - > {
                gson.toJson(entityobj, entityobj.getclass(), writer);
                Counter++;
            });



            if (blockingqueue.isEmpty() && future.isDone() && future.get()) {
                if (count == 0) {
                    Log.error("data not read properly");
                }
                break;
            }
        }
    } catch (Exception e) {
        log.error(e);
    }
    return counter;
}

When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.

No response is written to stream. Anyway to handle this weird behavior This doesn't happen when there are lot of records.


Solution

  • When blockingqueue.poll() is being executed, there are times data is still not loaded by the repositorythread. By the time the next if block comes, blocking queue is empty and future is completed so control gets out of while loop.

    You have a race condition in your code which is complicating matters:

    if (blockingqueue.isEmpty() && /* race here */ future.isDone() && future.get()) {
    

    There is a possibility that the blockingqueue.isEmpty() returns true and then the job finishes before the future.isDone() call happens causing the code to quit prematurely with an element left in the queue.

    An "end of stream" object that @Willis mentioned is a good option but a simple solution would be to refactor your code like:

    boolean futureDone = future.isDone();
    entity = blockingqueue.poll();
    if (entity == null) {
       if (futureDone) {
          break;
       }
    } else {
       // process entity
    }
    

    This ensures that you always check to see if the future is done before getting the last item from the blocking-queue which removes the race. You might have to do a poll one more time but that fine.

    Btw, if this code is really spinning, you should put some sort of timeout in poll to slow it down and not eat a CPU:

    entity = blockingqueue.poll(50, TimeUnit.MILLISECONDS);