Search code examples
javajava-streamfunctional-interface

Java - Collecting consumer?


Is there a functional way to first collect the elements of a stream and then immediately pass the collection to a consumer? In other words a construct that waits for the stream to end before applying a terminal operation on all stream elements as a collection instead of one by one?

For example, can you implement the following as a one-liner:

Stream<Event> stream = // a stream of events
List<Event> list = stream.collect(Collectors.toList());
doProcessEvents(list);

As a workaround, I can (ab)use Collectors.collectingAndThen() together with a Function to achieve the result I am looking for:

Function<List<Event>, Void> processingFunction = (list -> {
    doProcessEvents(list);
    return null;
});

Stream<Event> stream = // a stream of events
stream.collect(Collectors.collectingAndThen(Collectors.toList(), processingFunction);

Other alternatives that I considered (but that do not work) is if the collectingAndThen() method had a Consumer as second argument, e.g. Collectors.collectingAndThen(Collector downstream, Consumer consumer) or if the Consumer interface had a finish() method that was executed after the last element in the stream was consumed:

class EventConsumer implements Consumer<Event> {
    private final List<Event> list = new LinkedList<>();

    @Override
    public void accept(Event ev) {
        events.add(ev);
    }

    public void finish() {
        doProcessEvents(events);
    }
}

// usage
Stream<Event> stream = // a stream of events
stream.forEach(new EventConsumer());

The problem with this approach is that the events would be saved in the internal list, but the finish() method will not be invoked. It works with a slight modification, but still no one-liner:

Stream<Event> stream = // a stream of events
EventConsumer consumer = new EventConsumer()
stream.forEach(consumer);
consumer.finish();

Solution

  • In the end, I decided a different approach. Instead of trying to shoehorn together a collector and a consumer, I created an interface

    interface StreamConsumer<T> {
       void consume(Stream<T> stream);
    }
    

    and refactored the code as

    
    void processEvents(StreamConsumer<Events> streamConsumer, Stream<Event> events) {
        streamConsumer.consume(events);
    }
    

    What is the purpose of this? Now, I can implement different kinds of consumers, some that relies on List<Event>, others that can consume Stream<Event>, e.g.

    class ListEventConsumer implements StreamConsumer<Event> {
    
        private ListProcessor<Event> listProcessor;
    
        ListEventConsumer(ListProcessor<Event> listProcessor) {
           this.listProcessor = listProcessor;
        }
    
        @Override
        void consume(Stream<Event> events) {
            List<Event> list = events.collect(Collectors.toList());
            listProcess.process(list);
        }
    }
    

    as well as standard Java Consumer implementations, e.g.

    class FunctionalEventConsumer implements StreamConsumer<Event> {
    
        private Consumer<Event> consumer;
    
        FunctionalEventConsumer(Consumer<Event> consumer) {
            this.consumer = consumer;
        }
    
        @Override
        void consume(Stream<Event> events) {
            events.forEach(consumer);
        }
    }
    

    Now, I have a way of consuming Event both as a Stream<Event> as well as first collecting them in ListEventConsumer class that delegates them to a ListProcessor<Event>. With this approach, the class that delegates the event processing does not need to know what kind of StreamConsumer<Event> is used:

    // either a ListEventConsumer or a FunctionalConsumer
    StreamConsumer<Event> streamConsumer = // ...
    Stream<Event> events = // ...
    
    processEvents(streamConsumer, events);