Search code examples
javaconcurrencyqueueconcurrenthashmap

Tracking the progress between Queues in a Map


I have currently two queues and items traveling between them. Initially, an item gets put into firstQueue, then one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it. These moves obviously include some processing. I need to be able to get the status of any item (IN_FIRST, AFTER_FIRST, IN_SECOND, AFTER_SECOND, or ABSENT) and I implemented it manually by doing the update of the statusMap where the queue gets modified like

while (true) {
    Item i = firstQueue.take();
    statusMap.put(i, AFTER_FIRST);
    process(i);
    secondQueue.add(i);
    statusMap.put(i, IN_SECOND);
}

This works, but it's ugly and leaves a time window where the status is inconsistent. The inconsistency is no big deal and it'd solvable by synchronization, but this could backfire as the queue is of limited capacity and may block. The ugliness bothers me more.

Efficiency hardly matters as the processing takes seconds. Dedicated threads are used in order to control concurrency. No item should ever be in multiple states (but this is not very important and not guaranteed by my current racy approach). There'll be more queues (and states) and they'll of different kinds (DelayQueue, ArrayBlockingQueue, and maybe PriorityQueue).

I wonder if there's a nice solution generalizable to multiple queues?


Solution

  • Does it make sense to wrap the queues with logic to manage the Item status?

    public class QueueWrapper<E> implements BlockingQueue<E> {
        private Queue<E> myQueue = new LinkedBlockingQueue<>();
        private Map<E, Status> statusMap;
    
        public QueueWrapper(Map<E, Status> statusMap) {
            this.statusMap = statusMap;
        }
    
        [...]
        @Override
        public E take() throws InterruptedException {
            E result = myQueue.take();
            statusMap.put(result, Status.AFTER_FIRST);
            return result;
        }
    

    That way status management is always related to (and contained in) queue operations...

    Obviously statusMap needs to be synchronized, but that would be an issue anyway.