Search code examples
javamultithreadingblockingqueue

WorkerThread: Wait for processing done (BlockingQueue)


im an building a multithreaded application, using WorkerThreads which process Tasks from BlockingQueues. The worker looks as follws (as an abstract class. subclasses implement processItem()).

abstract class WorkerThread extends Thread {
    BlockingQueue<Task> q;
    int tasksInSystem; // globally available

    public void run() {
        while(!interrupted()) {
            Task t = q.take();
            process(t);
            tasksInSystem--;
        }
    }

    abstract void process(Task t);
}

The special thing is that i'd like to wait for all tasks to complete. My first idea was to:

  • count each added task
  • decrease the counter when processing completed.

But: But there are different types of Tasks and different worker implementations and multiple queues. So I would have to maintain tons of different counters.

What I'd like to have:

q.waitForEmptyAndCompleted()

That would require the queue to keep track of the Tasks "in flight" and require the Worker Processes to signal when they are done (instead of tasksInsystem---;).

The worker is not able to increase that counter, because he would have to count the tasks after he took them from the queue. But another thread may become running right after the take() call, such that the worker was not able to increase the counter beforehand.

Hence, the counter increase and take() must be tied together (atomar). Which leads me to a specialized BlockingQueue.

I didn't find a premade solution. So my best guess is to implement my own BlockingQueue. Is there something that I could use instead (to avoid implementing and testing a thread-safe blocking queue on my own)? Or do you have any idea to implement that wait call differently?


Solution

  • OK, since general ExecutorService is not enough perhaps ForkJoinPool will work, it does not expose queue explicitly, but should be very easy to use given what you have described.

    Key method is awaitQuiescence(long timeout, TimeUnit unit) which will wait until all submitted tasks have finished execution.