Let's say we have a Queue
BlockingQueue<String> queue= new LinkedBlockingQueue<>();
and some other thread puts values in it, then we read it like
while (true) {
String next = queue.take();
System.out.println("next message:" + next);
}
How can I iterate over this queue in stream style, while maintaining similar semantics to above code.
This code only traverses the current queue state:
queue.stream().forEach(e -> System.out.println(e));
I'm guessing a bit at what you're expecting, but I think I have a good hunch.
The stream of a queue, like iterating over a queue, represents the current contents of the queue. When the iterator or the stream reaches the tail of the queue, it doesn't block awaiting further elements to be added. The iterator or the stream is exhausted at that point and the computation terminates.
If you want a stream that consists of all current and future elements of the queue, you can do something like this:
Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException ie) {
return "Interrupted!";
}
})
.filter(s -> s.endsWith("x"))
.forEach(System.out::println);
(Unfortunately the need to handle InterruptedException
makes this quite messy.)
Note that there is no way to close a queue, and there is no way for Stream.generate
to stop generating elements, so this is effectively an infinite stream. The only way to terminate it is with a short-circuiting stream operation such as findFirst
.