Search code examples
javaconcurrencyqueuejava.util.concurrent

Concurrent Queue in Java that only retains the last item of each child thread


I have 1 main thread which starts up n child threads. Each of these child threads continually produce an new event and add it to the shared queue. That event represents the latest state of a complex calculation on the child thread.

The main thread consumes that shared queue, but it's not interested in processing all events still on the queue: if a single child thread posted 3 events to the queue, the main thread is only interested in the last one. The older events of that child thread can be discarded (as soon as the child thread adds the newer event).

For example:

childThread A adds event A1
mainThread removes event A1

childThread B adds event B1
childThread B adds event B2 // => B1 should be discarded
childThread B adds event B3 // => B2 should be discarded
mainThread removes event B3

childThread A adds event A2
childThread A adds event A3 // => A2 should be discarded
childThread B adds event B4
mainThread removes event A3
mainThread removes event B4

childThread B adds event B5
childThread A adds event A4
childThread A adds event A5 // => A4 should be discarded
childThread B adds event B6 // => B5 should be discarded
childThread A adds event A6 // => A5 should be discarded
mainThread removes event B6 // do B6 first because B5 was before A4
mainThread removes event A6

Optional requirement: The main thread does want to round-robin as much as possible over the events of the child threads, but still block if none of the child threads are producing.


Solution

  • I'd use two DataStructures: 1 BlockingQueue for "triggers" and 1 Map for reference to the latest event.

    EventSource will:

    1. On new Event make an entry to the Map (updating an older one when key already exists) Key = "this" (EventSource) , Value = Latest Event
    2. On new Event make an entry to the BlockingQueue with "this"(EventSource) but only if there is not already one in there.

    The last part is quite optional. It just reduces unsuccessful lookups into the map. You could as well just add triggers on each new Event. Everytime a lookup is negative for a key in the map just ignore it.

    The consumer will:

    1. Wait on the blocking queue for not empty.
    2. Remove top, use it as key into Event-Map ...
    3. Key not found: continue with next queue item.
    4. Key found: remove entry and process value (which is the event)
    5. Loop to 1.