How can I get last item of Flux without collapsing it with reduce() or last() ? Here is my use-case:
1) I have generator that produces Flux<T>
based on state.
2) When inner Flux
completes it alters the state that affect next Flux
objects I emit in generator.
Schematicaly it looks like this
static class State {
int secret = 2;
int iteration = 0;
}
Random rand = new Random(1024);
Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {
System.out.println(String.format("Generate: %d", state.secret));
Flux<Integer> inner = Flux.range(1, rand.nextInt(10));
sink.next(inner.doOnComplete(() -> {
// How do I get last item of `inner` here ?
// For example I'd like to decrement `state.secret` by last value of `inner`
}));
return state;
}).flatMap(Function.identity());
UPD: I unmarked my answer because the hack turned out to be unreliable. It's possible that .generate()
will be invoked before previous Flux
is fully consumed therefore making value in last
incorrect.
First version was unreliable. I hacked another one:
static <T> Flux<T> expandOnLastItem(Supplier<Flux<T>> seed, Function<T, Flux<T>> generator) {
return Flux.just(new AtomicReference<T>())
.flatMap(last -> Flux.just(seed.get().materialize())
.flatMap(Function.identity())
.expand(v -> {
if (v.hasValue()) {
last.set(v.get());
} else if (v.isOnComplete() && last.get() != null) {
Flux<T> res = generator.apply(last.get());
last.set(null);
return res.materialize();
}
return Flux.empty();
})
.filter(s -> !s.isOnComplete())
.dematerialize());
}
which can be used as
static Flux<Integer> getPage(int pageId, int size) {
return Flux.defer(() -> {
if (pageId < 3) {
System.out.println("Returning data for pageId: " + pageId);
return Flux.range(pageId * 100, size);
} else {
System.out.println("Returning empty for pageId: " + pageId);
return Flux.empty();
}
});
}
expandOnLastItem(
() -> getPage(0, 5),
lastId -> {
System.out.println(" Expanding. Last item: " + lastId);
int curPage = lastId / 100;
return getPage(curPage + 1, 5);
})
.reduce(0L, (count, value) -> {
System.out.println("==> " + value);
return count + 1;
})
.block();
So I hacked it by mutating state variable in generator. It works but it's not very functional. If anyone else can suggest alternative I'll greatly appreciate it.
Random rand = new Random(1024);
Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {
if (state.iteration < 4) {
final int count = rand.nextInt(10) + 1;
System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));
Flux<Integer> inner = Flux.range(state.secret, count);
final int[] last = {Integer.MIN_VALUE};
sink.next(
inner
.doOnNext(value -> {
last[0] = value;
})
.map(value -> String.format("Iter %d value %d", state.iteration, value))
.doOnComplete(() -> {
System.out.println(String.format("Inner complete (last item was %d)", last[0]));
state.secret = last[0];
state.iteration += 1;
}));
} else {
System.out.println("Generate complete");
sink.complete();
}
return state;
})
.flatMap(Function.identity())
.map(value -> {
System.out.println(String.format("Ext map: %s", value));
return value;
})
.buffer(5)
.flatMapIterable(Function.identity())
.subscribe(value -> System.out.println(String.format(" ---> %s", value)));
System.out.println("Exiting");