Search code examples
javarx-java2project-reactorreactive-streams

Reactive Streams: How to wait for all publishers, by key?


Suppose I have 3 publishers and 1 processor. The publishers emits items in the form {key: <integer>, value: <object>, publisher_id: <string>}.

The publishers makes IO operations, so:

  • On the one hand, I'd like the publishers to work on (roughly) N items at a given moment.
  • On the other hand, I'd like the consumer to merge the items to one single record (i.e. {key: <integer>, values: <list>})

I've actually already implemented a FluxProcessor that has an internal storage (ConcurrentHashMap) to keep all the items. It manually request() new items whenever CAPACITY wasn't reached.

I'd like to know if there's a built-in functionality to do that with RxJava(2)/ Spring Reactor API?


Solution

  • Use merge, rebatchRequests and toMultimap with RxJava 2:

    Flowable<KeyValuePublisher> source1 = ...
    Flowable<KeyValuePublisher> source2 = ...
    Flowable<KeyValuePublisher> source3 = ...
    
    Flowable.merge(
        source1.rebatchRequests(N),
        source2.rebatchRequests(N),
        source3.rebatchRequests(N)
    )
    .toMultimap(kvp -> kvp.key, kvp -> kvp.value)
    subscribe(map -> System.out.println(map));