Search code examples
javareactive-programmingproject-reactorreactorreactive-streams

Project Reactor: Multiple Publishers making HTTP calls and one Subscriber to handle all results


The problem with the following code is that the subscriber sees only items of the first flux (i.e. only printing 1). Interestingly, if I add delayElements, it works fine.

This is a toy example, but my intention is to replace it with Flux's that make HTTP GET requests and emit their results (also, could be more than two).

So reformulating my question, I have a many-to-one relation that needs to be implemented. How to implement it, considering my case? Would you use some kind of Processor?

 public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);

    });

    Flux<Integer> merged = flux1.mergeWith(flux2);
    merged.subscribe(s -> System.out.println(s));

    Thread.currentThread().join();
}

Trying to achieve the same idea with a TopicProcessor but it suffers from the same issue:

public static void main(String[] args) throws Exception {
    Flux<Integer> flux1 = Flux.generate(emitter -> {
        emitter.next(1);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    Flux<Integer> flux2 = Flux.generate(emitter -> {
        emitter.next(2);
        try {
            Thread.sleep(100);
        } catch (Exception e) {}
    });

    TopicProcessor<Integer> processor = TopicProcessor.create();
    flux1.subscribe(processor);
    flux2.subscribe(processor);

    processor.subscribe(s -> System.out.println(s));


    Thread.currentThread().join();
}

Solution

  • From the docs:

    Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.

    You're creating an infinite source here without a dedicated scheduler, so it's attempting to drain that source fully before merging - and that's why you have your issue.

    This may not be an issue in your real-world use case, since the result of the GET request, presumably, won't be infinite. However, if you want to make sure the results are interleaved regardless, you just need to make sure you set up each flux with its own scheduler (by calling subscribeOn(Schedulers.elastic()); on each Flux.)

    So your example then becomes:

    Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
            .subscribeOn(Schedulers.elastic());
    
    Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
            .subscribeOn(Schedulers.elastic());
    
    Flux<Integer> merged = flux1.mergeWith(flux2);
    merged.subscribe(s -> System.out.println(s));
    
    Thread.currentThread().join();