Search code examples
javaspringspring-bootproducer-consumerjava.util.concurrent

Thread blocking even when LinkedBlockingQueue is empty


For clear investigation I have only one thread producing an entity and one thread consuming it. These two parts share LinkedBlockingQueue. After consuming the entity the thread pass it forward to other thread to save entity in DB. The producing thread stops working after few iterations of inserting and removes an entity via queue. Debug logging shows it like the queue blocks the insert operation even when the queue is empty or has enough space.

Producer code:

         final BlockingQueue<Entity> queue = new LinkedBlockingQueue<>(8); //located in calling method

         ....................................................................................

         do {
                List<Entity> entityList = entityDatasource.getEntity();

                for (Entity entity: entityList) {
                    try {
                        log.debug("Size before insert opertaion is: " + queue.size());
                        queue.put(entity);
                        log.debug("Size after insert opertaion is: " + queue.size());
                    } catch (InterruptedException ex) {
                        ...
                    }
                }
             } while (atomicBool.get());

Consumer code:

        CompletableFuture<Void> queueHandler = CompletableFuture.runAsync(() -> {

            do {
                try {
                    log.debug("Queue size is: " + queue.size());
                    Entity entity = queue.take();
                    log.debug("Queue size is: " + queue.size());
                    storeInDb(entity);

                } catch (InterruptedException ex) {
                  ...
                }
            } while (atomicBool.get());

        }, asyncPoolQueueHandler); //ThreadPoolTaskExecutor

        List<CompletableFuture<Void>> pool = new ArrayList<>();
        IntStream.range(0, 1).forEach(i -> {
            pool.add(queueHandler);
        });
        CompletableFuture.allOf(pool.toArray(CompletableFuture[]::new));

DB store:

CompletableFuture
                .supplyAsync(() -> {

                    return entityRep.save(entity);

                }, asyncPoolDbPerformer).join(); //ThreadPoolTaskExecutor

VisualVM screenshot

I was wached VisualVM, but there is nothing unexpected to me: when producer stuck then other parts of pipeline are motionless. I would be grateful for advice on what I could do with my issue


Solution

  • The problem was in wrong design. Producer-consumer is not normal solution. More appropriate way is using synchronous blocking pipeline scaled by performance of bottleneck. In my case I'm bounded by database pool connection performance.

    (dataSource->businessLogic->dataDestination) x N

    where N is scale