Search code examples
javamultithreadingconcurrencymapreducereactor

How to perform multi-threaded map/reduce using Reactor framework 2.x?


I previously asked this question for Reactor 1.x:

Let's say I have a Collection<Map>. I want to:

Transform each Map instance to an object of type Foo concurrently (each instance is totally independent of another - there is no need to convert each serially/iteratively).

When all of them are converted, I want a a method, onReduce(Collection<Foo> foos), to be called - the argument contains all of the resulting Foo instances.

But we can't seem to find an equivalent solution for Reactor 2.x - just single threaded.

How do you perform multi-threaded map/reduce in Reactor 2.x? For example, how might you do this with an ExecutorService-based Dispatcher?


Solution

  • It's actually pretty easy now with Reactor 2.0. You could do something like this:

    List<Map<String, Object>> data = readData(); // <1>
    
    Streams.from(data)
        .flatMap(m -> Streams.just(m)
                                .dispatchOn(Environment.cachedDispatcher()) // <2>
                                .map(ignored -> Thread.currentThread().getName()))
           .buffer() // <3>
           .consume(s -> System.out.println("s: " + s)); // <4>
    
    1. Create a Stream based on the input data.
    2. Create a new Stream for each Map and dispatch map operations on the given Dispatcher.
    3. Buffer all values until complete, which will be sent downstream when collection is emptied.
    4. Consume List which is the result of load-balanced transforms from the substream.