Search code examples
javareactive-programmingproject-reactor

creating a flux from a method returning mono


I learning how to change non-reactive code to reactive. As part of the exercise I took this sample code for finding primes in a range

public class FindPrime {

    private List<Integer> primes;

    public FindPrime() {
        primes = new ArrayList<>();
    }

    public List<Integer> allPrimes(int low, int high) {
        for (int i = low; i <= high; i++) {
            // skip is the number is 1
            // As 1 is neither prime nor composite
            if (i == 1) continue;

            // if the number is prime add the number to list
            if (isPrime(i) == 1) {
                primes.add(i);
            }
        }
        return primes;
    }

    private int isPrime(int n) {
        // for loop from 2 to sqrt(n)
        for (int i = 2; i <= Math.sqrt(n); i++) {
            // if the number is divisible return -1
            if (n % i == 0) {
                return -1;
            }
        }
        // return 1 if number is prime
        return 1;
    }
}

While rewriting it to reactive code I started with using Flux.fromIterbale on the List produced to return a flux, but then the code essentially is still blocking? specially the isPrime method. Hence I turned it into the following

public Mono<Integer> isPrime(int n) {
    for (int i = 2; i <= Math.sqrt(n); i++) {
        // if the number is divisible return -1
        if (n % i == 0) {
            return Mono.just(-1);
        }
    }
    // return 1 if number is prime
    return Mono.just(1);
}

Which works as expected for me if I access the Mono, JitteryPrimesEmitter being the class where I have the new method

JitteryPrimesEmitter emitter = new JitteryPrimesEmitter();
// One Value at a time
int input = 97;
new JitteryPrimesEmitter().isPrime(input)
            .filter(x -> x == 1)
            .subscribe(x -> System.out.println("Got a Prime " + input));

The next step was to emit a flux of primes, using this isPrime method, so I wrote the allPrimes method as follows

public Flux<Integer> allPrimes(int low, int high) {
    return Flux.create((FluxSink<Integer> sink) -> {
        for (int i = low; i <= high; i++) {
            // skip is the number is 1
            // As 1 is neither prime nor composite
            if (i == 1) continue;

            isPrime(i).filter(n -> n == 1)
                    .delayElement(Duration.ofMillis(new Random().nextInt(20) + 1))
                    .doOnNext(m -> System.out.println("Publishing new prime " + m + "for range " + low + "-" + high))
                    .doOnNext(sink::next);
        }
    });
}

The random delay is how I'm making it jittery to stimulate data flow from a slow I/O upstream. This doesn't work as if I call allPrimes with 10, 100 as input I see no results, here's the downstream call

// Flux of values
AtomicInteger sum = new AtomicInteger();
AtomicInteger highest = new AtomicInteger(0);
Flux<Integer> primesFlux = emitter.allPrimes(10, 100);
primesFlux.doOnSubscribe(s -> System.out.println("Subscribed downstream operation to primes flux"));

ConnectableFlux<Integer> primes = primesFlux.publish();
primes.doOnComplete(() -> System.out.println("Sum of Primes = " + sum.get()))
        .subscribe(i -> {
            sum.addAndGet(i);
            System.out.println("Adding emitted prime " + i + " to sum " + sum);
        });

primes.doOnComplete(() -> System.out.println("Highest Prime = " + highest.get()))
        .subscribe(i -> {
            System.out.println("Checking if " + i + " is highest prime in the range 10-100");
            highest.set(i);
        });

primes.connect();
System.out.println("Done Subscribing to Primes Flux");

Neither sum nor highest is populated, nor do I see any of the messages I'm printing in the downstream Flux pipelines, I do see that the isPrime mono method emits values as and when it's called from the allPrimes method.


Solution

  • Problem was my implementation of allPrimes here's the fixed version

    public Flux<Integer> allPrimes(int low, int high) {
        return Flux.range(low, high - low)
                .flatMap(it ->
                        isPrime(it).flatMap(result ->
                                result == 1 ? Mono.just(it) : Mono.empty()))
                .doOnNext(m -> System.out.println("Publishing new prime " + m + " for range " + low + "-" + high));
    }