Search code examples
javaalgorithmconcurrencypriority-queuejava.util.concurrent

PriorityBlockingQueue that ensures consecutive item sequence


I'm receiving a sequence of messages, and I want to process them in their sequential order. Each message has a sequence number. There's a pool of threads receiving them. I want to put them into a blocking queue like a PriorityBlockingQueue, and read them in the right order, blocking until the next consecutive message is available.

E.g. given this code:

ConsecutiveBlockingQueue<Integer> q = new ConsecutiveBlockingQueue<>();

new Thread (()->{ q.put(0); q.put(2); }).start();
new Thread (()->{ q.put(1); q.put(3); }).start();

ArrayList<Integer> ordered = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
    ordered.add(q.take());
}
System.out.println(ordered);

I want it to print [0, 1, 2, 3]


Solution

  • Here's a minimally tested class that seems to do what I want. Comments welcome.

    package com.ciphercloud.sdp.common;
    
    import java.util.AbstractQueue;
    import java.util.Collection;
    import java.util.Iterator;
    import java.util.PriorityQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.function.ToIntFunction;
    
    public class ConsecutiveBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
        private final ToIntFunction <E> ixFunction;
        // blocking queue for consecutive items. Take operations will look into this queue
        LinkedBlockingQueue <E> bQueue = new LinkedBlockingQueue<>();
    
        // buffering/ordering queue for items that are out of sequence
        PriorityQueue <E> pQueue = new PriorityQueue<>();
    
        ReentrantLock lock = new ReentrantLock();
    
        private int nextIx;
    
        ConsecutiveBlockingQueue(ToIntFunction <E> ixFunction) {
            this(0, ixFunction);
        }
    
        ConsecutiveBlockingQueue(int startIx, ToIntFunction <E> ixFunction) {
            nextIx = startIx;
            this.ixFunction = ixFunction;
        }
    
        @Override
        public Iterator <E> iterator() {
            return bQueue.iterator();
        }
    
        @Override
        public int size() {
            return bQueue.size();
        }
    
        protected BlockingQueue <E> delegate() {
            return bQueue;
        }
    
        @Override
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }
    
        @Override
        public int drainTo(Collection <? super E> c) {
            return bQueue.drainTo(c);
        }
    
        @Override
        public int drainTo(Collection <? super E> c, int maxElements) {
            return bQueue.drainTo(c, maxElements);
        }
    
        @Override
        public void put(E e) {
            offer(e);
        }
    
        @Override
        public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
            offer(e);
            return true;
        }
    
        @Override
        public boolean offer(E e) {
            lock.lock();
            try {
                if (ixFunction.applyAsInt(e) == nextIx) {
                    // offered item is the next consecutive expected one
                    // put it directly into the blocking queue
                    bQueue.offer(e);
                    nextIx++;
    
                    // if there are any buffered items in the pQueue, move them
                    // into the blocking queue while they follow consecutively
                    while(true) {
                        E next = pQueue.peek();
                        if(next == null || ixFunction.applyAsInt(next) != nextIx) {
                            // no more items in pQueue, or next item is not consecutive
                            break;
                        }
                        pQueue.poll();
                        bQueue.offer(next);
                        nextIx++;
                    }
                } else {
                    // offered item is not consecutively next. Buffer it in pQueue
                    pQueue.offer(e);
                }
            } finally {
                lock.unlock();
            }
    
            return true;
        }
    
    
        @Override
        public E take() throws InterruptedException {
            return bQueue.take();
        }
    
        @Override
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            return bQueue.poll(timeout, unit);
        }
    
    
        @Override
        public E poll() {
            return bQueue.poll();
        }
    
        @Override
        public E peek() {
            return bQueue.peek();
        }
    }