I have multiple queues of tasks and a consumer
thread.
Consumer
thread should wake up once any of the queues has a task. So I wonder what is the best way to make such kind of communication.
These are some implementations which can solve this and explanation why I want to find something different:
.wait(timeout)
on it from a consumer thread and call .notify()
from producer threads. However this approach uses wait/notify
which
is kind of low-level api, so I to avoid using it if possible. Also it's not always correct, in some cases we can end up waiting for the whole timeout, while we have tasks to do (sleeping barber problem).CountDownLatch
with reset
method would do well,
but I didn't find anything like that in java.util.concurrent
. Implementation would be fairly simple, however implementing new bicycle is something I want to avoid even more then wait/notify
. Also I believe it has the same problem with waiting for the whole timeout as the wait/notify
approach.Task
and make all producer write to the same queue, so that consumer would listen to a single queue. This approach is actually very good in most cases I believe, but in my case this part of application has low-latency requirements, so I must avoid creating new objects(e.g. these wrappers) and also it would increase contention on the tail of the queue(instead of one consumer, all of them would write there) which is not very good for latency too.So are there any other ways to implement it?(may be using some other concurrency primitives)
How about using task notification queue, in this scenario if any of task queue adds and item it also add item to ntification queue.
The following snipped illustrates this approach:
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
public class Main<T1, T2> {
Queue<T1> taskType1Queue = new ArrayBlockingQueue<T1>(10);
Queue<T2> taskType2Queue= new ArrayBlockingQueue<T2>(10);
ArrayBlockingQueue<Boolean> notificationQueue= new ArrayBlockingQueue<Boolean>(2);
public void produceType1(T1 task) {
new Thread(new Runnable() {
@Override
public void run() {
taskType1Queue.add(task);
notificationQueue.offer(true);; //does not block if full
}
});
}
public void produceType2(T2 task) {
new Thread(new Runnable() {
@Override
public void run() {
taskType2Queue.add(task);
notificationQueue.offer(true); //does not block if full
}
});
}
public void consume() {
try {
notificationQueue.take();//wait till task1 o task2 has been published
for(;!Thread.currentThread().isInterrupted();){
T1 task1 = taskType1Queue.poll();//does not block if queue is empty
if (task1 != null) {
//do something
}
T2 task2 = taskType2Queue.poll();//does not block if queue is empty
if (task2 != null) {
//do something
}
if(task1 == null && task2 == null) {
break;
}
}
} catch (InterruptedException e) {
System.out.println("Consumer thread done");
return;
}
}
}