Search code examples
javajava-8java-streamqueue

Stream API and Queues: Subscribe to BlockingQueue stream-style


Let's say we have a Queue

BlockingQueue<String> queue= new LinkedBlockingQueue<>();

and some other thread puts values in it, then we read it like

while (true) {
    String next = queue.take();
    System.out.println("next message:" + next);
}

How can I iterate over this queue in stream style, while maintaining similar semantics to above code.

This code only traverses the current queue state:

queue.stream().forEach(e -> System.out.println(e));

Solution

  • I'm guessing a bit at what you're expecting, but I think I have a good hunch.

    The stream of a queue, like iterating over a queue, represents the current contents of the queue. When the iterator or the stream reaches the tail of the queue, it doesn't block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.

    If you want a stream that consists of all current and future elements of the queue, you can do something like this:

    Stream.generate(() -> {
            try {
                return queue.take();
            } catch (InterruptedException ie) {
                return "Interrupted!";
            }
        })
        .filter(s -> s.endsWith("x"))
        .forEach(System.out::println);   
    

    (Unfortunately the need to handle InterruptedException makes this quite messy.)

    Note that there is no way to close a queue, and there is no way for Stream.generate to stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst.