Search code examples

How can I get last item of Flux without collapsing it with reduce() or last()

How can I get last item of Flux without collapsing it with reduce() or last() ? Here is my use-case:

1) I have generator that produces Flux<T> based on state. 2) When inner Flux completes it alters the state that affect next Flux objects I emit in generator.

Schematicaly it looks like this

static class State {
    int secret = 2;
    int iteration = 0;

Random rand = new Random(1024);
Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {

    System.out.println(String.format("Generate: %d", state.secret));
    Flux<Integer> inner = Flux.range(1, rand.nextInt(10)); -> {
        // How do I get last item of `inner` here ?
        // For example I'd like to decrement `state.secret` by last value of `inner`

    return state;

UPD: I unmarked my answer because the hack turned out to be unreliable. It's possible that .generate() will be invoked before previous Flux is fully consumed therefore making value in last incorrect.


  • First version was unreliable. I hacked another one:

    static <T> Flux<T> expandOnLastItem(Supplier<Flux<T>> seed, Function<T, Flux<T>> generator) {
        return Flux.just(new AtomicReference<T>())
                .flatMap(last -> Flux.just(seed.get().materialize())
                        .expand(v -> {
                            if (v.hasValue()) {
                            } else if (v.isOnComplete() && last.get() != null) {
                                Flux<T> res = generator.apply(last.get());
                                return res.materialize();
                            return Flux.empty();
                        .filter(s -> !s.isOnComplete())

    which can be used as

    static Flux<Integer> getPage(int pageId, int size) {
        return Flux.defer(() -> {
            if (pageId < 3) {
                System.out.println("Returning data for pageId: " + pageId);
                return Flux.range(pageId * 100, size);
            } else {
                System.out.println("Returning empty for pageId: " + pageId);
                return Flux.empty();
            () -> getPage(0, 5),
            lastId -> {
                System.out.println("  Expanding. Last item: " + lastId);
                int curPage = lastId / 100;
                return getPage(curPage + 1, 5);
            .reduce(0L, (count, value) -> {
                System.out.println("==> " + value);
                return count + 1;

    So I hacked it by mutating state variable in generator. It works but it's not very functional. If anyone else can suggest alternative I'll greatly appreciate it.

    Random rand = new Random(1024);
    Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {
        if (state.iteration < 4) {
            final int count = rand.nextInt(10) + 1;
            System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));
            Flux<Integer> inner = Flux.range(state.secret, count);
            final int[] last = {Integer.MIN_VALUE};
                            .doOnNext(value -> {
                                last[0] = value;
                            .map(value -> String.format("Iter %d value %d", state.iteration, value))
                            .doOnComplete(() -> {
                                System.out.println(String.format("Inner complete (last item was %d)", last[0]));
                                state.secret = last[0];
                                state.iteration += 1;
        } else {
            System.out.println("Generate complete");
        return state;
            .map(value -> {
                System.out.println(String.format("Ext map: %s", value));
                return value;
            .subscribe(value -> System.out.println(String.format("  ---> %s", value)));