Search code examples
javamultithreadingconcurrencyconsumerproducer

Consumer producer pattern where producer waits for all consumers to complete before retriggering itself?


I'm trying to figure out the best approach in Java in this case (any version) to implement a single producer multiple consumer where I use ExecutorService (by preference, not need) where the producer needs to run "forever" but each time it runs, it needs to wait for everything to be fully processed, e.g. all consumer threads have terminated, the queue is empty and no items are left to be produced. The producer should only poll its data source at a fixed interval as well.

So as an example: every 30 minutes I want my producer to poll its data source for records and submit them to the queue. If the consumers are taking more than 30 minutes to process, I want the producer to wait until all the items have been processed before polling its data source again (which it would do so immediately since 30 minutes had elapsed).

Not looking for someone to write my code for me. Some basic hints/guidance would be really appreciated.

Here is a shortened example implementation I'm trying to work from. I've taken out all of my horrible attempts at solving the problem. Note that the hard coded parameters for constructing the ThreadPoolExecutor will eventually be dynamic.

import java.util.concurrent.*;
public class ItemProcessorService {
    public static void main(String args[]) throws InterruptedException {
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();
        ThreadFactory threadFactory = Executors.defaultThreadFactory();
        int corePoolSize = 5,
                maxPoolSize = 10,
                keepAlive = 10,
                queueCapacity = 1;
        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAlive, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity), threadFactory, rejectionHandler);

        for (int i = 0; i < 10; i++) {
            executor.execute(new ItemConsumer());
        }
        executor.shutdown();
        while (!executor.isTerminated()) {
        }
        System.out.println("Executor finished");
    }
}

class ItemConsumer implements Runnable {
    @Override
    public void run() {
        processItem();
    }

    private void processItem() {
        try {
            Thread.sleep(3000);
            System.out.println(Thread.currentThread().getName() + " - processed item");
        } catch (InterruptedException e) {
            //e.printStackTrace();
        }
    }
}

class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        System.out.println(r.toString() + " - rejected");
    }
}

Solution

  • You can solve this problem nicely by creating an ExecutorService for every run of the producer. The producer creates it, shuts it down, and also waits for its termination.

    For scheduling the producer, use either ScheduledExecutorService.scheduleWithFixedDelay(...) or ScheduledExecutorService.scheduleAtFixedRate(...), depending on what you want:

    • scheduleWithFixedDelay(...) will keep the specified amount of time between two runs, so no matter how long a run lasts, the next will follow after the specified amount of time:

      ...|XXXXXX|.............|XXXXX|.............|XXXXXXXXX|.............|X <--- fix ---> <--- fix ---> <--- fix --->

    • scheduleAtFixedRate(...) tries to keep the scheduling rate, so if a producer needs longer, the time between two runs will be reduced, but two runs will never overlap:

      ...|XXXXXXXX|...|XXXXX|......|XXXXXXXXXXXXXXX||XXX|....|XX <--- fix ---><--- fix ---><--- fix ---><--- fix ---><-

    Simple example of how the producer may look like:

    public class Producer implements Runnable {
        public void run() {
            ExecutorService executor = ... // create new executor
    
            // queue items
            for (Object item : itemSource) {
                executor.schedule(new Consumer(item));
            }
    
            // shutdown executor
            executor.shutdown();
            executor.awaitTermination(2, TimeUnit.HOURS);
        }
    }
    
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    scheduler.scheduleWithFixedDelay(new Producer(), 30, 30, TimeUnit.MINUTES);
    // or
    scheduler.scheduleAtFixedRate(new Producer(), 30, 30, TimeUnit.MINUTES);
    

    Of course you will have to do proper exception handling as every exception in a producer run that is not caught, will stop the scheduler from re-scheduling it.

    Please note that creating an executor might be expensive, so this approach is only appropriate, if consuming the items is far more expensive than creating the executor (which, in your situation - seems to be the case).