I learning how to change non-reactive code to reactive. As part of the exercise I took this sample code for finding primes in a range
public class FindPrime {
private List<Integer> primes;
public FindPrime() {
primes = new ArrayList<>();
}
public List<Integer> allPrimes(int low, int high) {
for (int i = low; i <= high; i++) {
// skip is the number is 1
// As 1 is neither prime nor composite
if (i == 1) continue;
// if the number is prime add the number to list
if (isPrime(i) == 1) {
primes.add(i);
}
}
return primes;
}
private int isPrime(int n) {
// for loop from 2 to sqrt(n)
for (int i = 2; i <= Math.sqrt(n); i++) {
// if the number is divisible return -1
if (n % i == 0) {
return -1;
}
}
// return 1 if number is prime
return 1;
}
}
While rewriting it to reactive code I started with using Flux.fromIterbale
on the List produced to return a flux, but then the code essentially is still blocking? specially the isPrime
method. Hence I turned it into the following
public Mono<Integer> isPrime(int n) {
for (int i = 2; i <= Math.sqrt(n); i++) {
// if the number is divisible return -1
if (n % i == 0) {
return Mono.just(-1);
}
}
// return 1 if number is prime
return Mono.just(1);
}
Which works as expected for me if I access the Mono, JitteryPrimesEmitter
being the class where I have the new method
JitteryPrimesEmitter emitter = new JitteryPrimesEmitter();
// One Value at a time
int input = 97;
new JitteryPrimesEmitter().isPrime(input)
.filter(x -> x == 1)
.subscribe(x -> System.out.println("Got a Prime " + input));
The next step was to emit a flux of primes, using this isPrime
method, so I wrote the allPrimes
method as follows
public Flux<Integer> allPrimes(int low, int high) {
return Flux.create((FluxSink<Integer> sink) -> {
for (int i = low; i <= high; i++) {
// skip is the number is 1
// As 1 is neither prime nor composite
if (i == 1) continue;
isPrime(i).filter(n -> n == 1)
.delayElement(Duration.ofMillis(new Random().nextInt(20) + 1))
.doOnNext(m -> System.out.println("Publishing new prime " + m + "for range " + low + "-" + high))
.doOnNext(sink::next);
}
});
}
The random delay is how I'm making it jittery to stimulate data flow from a slow I/O upstream. This doesn't work as if I call allPrimes
with 10, 100 as input I see no results, here's the downstream call
// Flux of values
AtomicInteger sum = new AtomicInteger();
AtomicInteger highest = new AtomicInteger(0);
Flux<Integer> primesFlux = emitter.allPrimes(10, 100);
primesFlux.doOnSubscribe(s -> System.out.println("Subscribed downstream operation to primes flux"));
ConnectableFlux<Integer> primes = primesFlux.publish();
primes.doOnComplete(() -> System.out.println("Sum of Primes = " + sum.get()))
.subscribe(i -> {
sum.addAndGet(i);
System.out.println("Adding emitted prime " + i + " to sum " + sum);
});
primes.doOnComplete(() -> System.out.println("Highest Prime = " + highest.get()))
.subscribe(i -> {
System.out.println("Checking if " + i + " is highest prime in the range 10-100");
highest.set(i);
});
primes.connect();
System.out.println("Done Subscribing to Primes Flux");
Neither sum
nor highest
is populated, nor do I see any of the messages I'm printing in the downstream Flux pipelines, I do see that the isPrime
mono method emits values as and when it's called from the allPrimes
method.
Problem was my implementation of allPrimes
here's the fixed version
public Flux<Integer> allPrimes(int low, int high) {
return Flux.range(low, high - low)
.flatMap(it ->
isPrime(it).flatMap(result ->
result == 1 ? Mono.just(it) : Mono.empty()))
.doOnNext(m -> System.out.println("Publishing new prime " + m + " for range " + low + "-" + high));
}