I have an infinite hot flux of data. I am about to engage in carrying out an operation on each element in the stream, each of which returns a Mono which will complete (one way or another) after some finite time.
There is the possibility of an error being thrown from these operations. If so, I want to resubscribe to the hot flux without missing anything, retrying elements that were in the middle of being processed when the error was thrown (i.e. anything that did not complete successfully).
What do I do here? I can tolerate repeated operations on the same elements, but not losing elements entirely from the stream.
I've attempted to use a ReplayProcessor to handle this, but I can't see a way of making it work without repeating a lot of operations that might well have succeeded (using a very conservative timeout), or losing elements due to new elements overriding old ones in the buffer (as below).
Test case:
@Test
public void fluxTest() {
List<String> strings = new ArrayList<>();
strings.add("one");
strings.add("two");
strings.add("three");
strings.add("four");
ConnectableFlux<String> flux = Flux.fromIterable(strings).publish();
//Goes boom after three uses of its method, otherwise
//returns a mono. completing after a little time
DangerousClass dangerousClass = new DangerousClass(3);
ReplayProcessor<String> replay = ReplayProcessor.create(3);
flux.subscribe(replay);
replay.flatMap(dangerousClass::doThis)
.retry(1)
.doOnNext(s -> LOG.info("Completed {}", s))
.subscribe();
flux.connect();
flux.blockLast();
}
public class DangerousClass {
Logger LOG = LoggerFactory.getLogger(DangerousClass.class);
private int boomCount;
private AtomicInteger count;
public DangerousClass(int boomCount) {
this.boomCount = boomCount;
this.count = new AtomicInteger(0);
}
public Mono<String> doThis(String s) {
return Mono.fromSupplier(() -> {
LOG.info("doing dangerous {}", s);
if (count.getAndIncrement() == boomCount) {
LOG.error("Throwing exception from {}", s);
throw new RuntimeException("Boom!");
}
return s;
}).delayElement(Duration.ofMillis(600));
}
}
This prints:
doing dangerous one
doing dangerous two
doing dangerous three
doing dangerous four
Throwing exception from four
doing dangerous two
doing dangerous three
doing dangerous four
Completed four
Completed two
Completed three
One is never completed.
The error (at least in the above example) can only occur in the flatMap(dangerousClass::doThis)
call - so resubscribing to the root Flux
and replaying elements when this one flatMap()
call has failed seems a bit odd, and (probably) isn't what you want to do.
Instead, I'd recommend ditching the ReplayProcessor
and just calling retry on the inner flatMap()
call instead, so you end up with something like:
ConnectableFlux<String> flux = Flux.range(1, 10).map(n -> "Entry " + n).publish();
DangerousClass dangerousClass = new DangerousClass(3);
flux.flatMap(x -> dangerousClass.doThis(x).retry(1))
.doOnNext(s -> System.out.println("Completed " + s))
.subscribe();
flux.connect();
This will give you something like the following, with all entries completed and no retries:
doing dangerous Entry 1
doing dangerous Entry 2
doing dangerous Entry 3
doing dangerous Entry 4
Throwing exception from Entry 4
doing dangerous Entry 4
Completed Entry 2
Completed Entry 1
Completed Entry 3
Completed Entry 4