I have a PriorityBlockingQueue
. A single thread consumes one message at a time from this queue and processes it. Several further threads are inserting messages into the queue. The producer threads assign an integral priority to each message they submit. A static AtomicLong
is used to assign every message a unique, monotonically incrementing ID. The Comparator
of the queue orders messages by this priority first, and then equal priority messages are ordered by ID (lowest ID first.)
Problem: Sometimes one producer submits a large number of messages. This then starves the other producers of having their messages processed. What I'd like to do, is have the consumer round-robin between producers for equal priority messages (while still processing equal priority messages of a single producer in submission order). But I cannot work out how to write the Comparator
to do this.
Another alternative I considered is having a separate queue for each producer. However, I don't think that could work, since I'm not aware of any way for single thread to wait on multiple queues.
I feel like it's more straightforward to implement this with one Queue
for each producer. One thread can't wait on multiple Queue
s, but you could combine all of the Queue
s into one helper class so that it doesn't need to.
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
public class RoundRobin<P, E> {
private final Lock lock = new ReentrantLock();
private final Condition added = lock.newCondition();
@GuardedBy("lock") private final Map<P, Queue<E>> queues = new LinkedHashMap<>();
public boolean add(P producer, E item) {
lock.lock();
try {
if (!queues.containsKey(producer)) {
queues.put(producer, new PriorityBlockingQueue<>());
}
added.signalAll();
return queues.get(producer).add(item);
} finally {
lock.unlock();
}
}
public Iterator<E> roundRobinIterator() {
return new Iterator<E>() {
private Iterator<? extends Queue<E>> i = null;
private boolean singlePass = true;
@Override
public boolean hasNext() {
return true;
}
@Override
public E next() {
lock.lock();
try {
while (true) {
if (i == null || !i.hasNext()) {
i = queues.values().iterator();
singlePass = true;
}
while (i.hasNext()) {
Queue<E> q = i.next();
if (!q.isEmpty()) {
if (singlePass) {
// copy the iterator to prevent
// ConcurrentModificationExceptions
singlePass = false;
i = copy(i);
}
return q.poll();
}
}
if (singlePass) {
// If singlePass is true then we just checked every
// queue and they were all empty.
// Wait for another element to be added.
added.await();
}
}
} catch (InterruptedException e) {
throw new NoSuchElementException(e.getMessage());
} finally {
lock.unlock();
}
}
private <T> Iterator<? extends T> copy(Iterator<? extends T> i) {
List<T> copy = new ArrayList<>();
while (i.hasNext()) {
copy.add(i.next());
}
return copy.iterator();
}
};
}
}