Search code examples
javascaladata-structuresscheduling

Time-based thread-safe priority queue


I need some queue-like datastracture for the following task:

  1. Some thread adds data items with attached delay value (for example in seconds), e.g. queue.add(data, delay).
  2. There can be different delays as well as identical and queue should act as a priority queue:smaller delay item is closer to the end (gets dequeued faster)
  3. Every second delay of the queued items should decrease by 1 until it reaches 0 (then it will be left unchanged as 0)
  4. Amongst the items where delay is 0 the order of dequeuing is just their order of insertion (though the order of reaching 0 is better)
  5. Some client threads systematically get the elements from this queue, and it gives only the delay = 0 element. If none such exists, then it blocks, or throws.

Thus, I want some functionality of queue + a little scheduling, threadsafe. I suspect that this type of thing should be quite a regular task in some contexts.

My question: is there any production ready solution for java or scala for this type of task? I'd like not to reinvent another bicycle.

EDIT: it seems that there are exact such thing in java standard library: DelayQueue, take a look at it before following answers.


Solution

  • You may use a PriorityQueue that should take as input Comparable objects. You should compare these objects based on a Timestamp field (the smaller the better); as already mentioned by @Henry it would be better to store a Timestamp rather than a delay. This is very easy to implement; just store currentTime + delay.

    Then, when a client requests for the head element, you need to create a synchronized method that does the following operations:

    • first peek() to check if the head element has a timestamp < currentTime
    • if yes, poll() this element, else throw.

    2nd solution (ported from my comment):

    Actually, one could add a ScheduledThreadPoolExecutor as an intermediate layer; Now you don't need a Timestamp, just provide the delay to the executor.

    When each runnable/callable executes, the corresponding object is added to another normal FIFO Queue where it will be immediately available for polling;

    Your clients may now poll elements from this second FIFO Queue.