Is there a functional way to first collect the elements of a stream and then immediately pass the collection to a consumer? In other words a construct that waits for the stream to end before applying a terminal operation on all stream elements as a collection instead of one by one?
For example, can you implement the following as a one-liner:
Stream<Event> stream = // a stream of events
List<Event> list = stream.collect(Collectors.toList());
doProcessEvents(list);
As a workaround, I can (ab)use Collectors.collectingAndThen() together with a Function
to achieve the result I am looking for:
Function<List<Event>, Void> processingFunction = (list -> {
doProcessEvents(list);
return null;
});
Stream<Event> stream = // a stream of events
stream.collect(Collectors.collectingAndThen(Collectors.toList(), processingFunction);
Other alternatives that I considered (but that do not work) is if the collectingAndThen()
method had a Consumer
as second argument, e.g. Collectors.collectingAndThen(Collector downstream, Consumer consumer)
or if the Consumer
interface had a finish()
method that was executed after the last element in the stream was consumed:
class EventConsumer implements Consumer<Event> {
private final List<Event> list = new LinkedList<>();
@Override
public void accept(Event ev) {
events.add(ev);
}
public void finish() {
doProcessEvents(events);
}
}
// usage
Stream<Event> stream = // a stream of events
stream.forEach(new EventConsumer());
The problem with this approach is that the events would be saved in the internal list, but the finish()
method will not be invoked. It works with a slight modification, but still no one-liner:
Stream<Event> stream = // a stream of events
EventConsumer consumer = new EventConsumer()
stream.forEach(consumer);
consumer.finish();
In the end, I decided a different approach. Instead of trying to shoehorn together a collector and a consumer, I created an interface
interface StreamConsumer<T> {
void consume(Stream<T> stream);
}
and refactored the code as
void processEvents(StreamConsumer<Events> streamConsumer, Stream<Event> events) {
streamConsumer.consume(events);
}
What is the purpose of this? Now, I can implement different kinds of consumers, some that relies on List<Event>
, others that can consume Stream<Event>
, e.g.
class ListEventConsumer implements StreamConsumer<Event> {
private ListProcessor<Event> listProcessor;
ListEventConsumer(ListProcessor<Event> listProcessor) {
this.listProcessor = listProcessor;
}
@Override
void consume(Stream<Event> events) {
List<Event> list = events.collect(Collectors.toList());
listProcess.process(list);
}
}
as well as standard Java Consumer
implementations, e.g.
class FunctionalEventConsumer implements StreamConsumer<Event> {
private Consumer<Event> consumer;
FunctionalEventConsumer(Consumer<Event> consumer) {
this.consumer = consumer;
}
@Override
void consume(Stream<Event> events) {
events.forEach(consumer);
}
}
Now, I have a way of consuming Event
both as a Stream<Event>
as well as first collecting them in ListEventConsumer
class that delegates them to a ListProcessor<Event>
. With this approach, the class that delegates the event processing does not need to know what kind of StreamConsumer<Event>
is used:
// either a ListEventConsumer or a FunctionalConsumer
StreamConsumer<Event> streamConsumer = // ...
Stream<Event> events = // ...
processEvents(streamConsumer, events);