Search code examples
javajava-8java-streampriority-queue

PriorityBlockingQueue stream in Java 8 is out of order


These two pieces of code have different order of output. First piece:

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } else {
        return;
    }
}

Second piece:

jobQueue.stream()
        .filter(TimeoutJobRequest::isReady)
        .peek(jobQueue::remove)
        .forEach(this::execute);

Note that jobQueue is a PriorityBlockingQueue.

The reordering occurs only when this::execute is relatively long (like for a couple of seconds.)


Solution

  • The stream of PriorityBlockingQueue follows the Iterator order, which according to the documentation:

    The Iterator provided in method iterator() is not guaranteed to traverse the elements of the PriorityBlockingQueue in any particular order.

    If you want the priority order, you need to poll the elements from the PriorityBlockingQueue.

    PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
    pq.add(5);
    pq.add(8);
    pq.add(3);
    
    System.out.println("-- Try 1 --");
    pq.stream().forEach(System.out::println);
    
    System.out.println("-- Try 2 --");
    IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);
    

    Output (it may depend on Java implementation):

    -- Try 1 --
    3
    8
    5
    -- Try 2 --
    3
    5
    8