Search code examples
javaproject-reactorreactive-streams

How can I get last item of Flux without collapsing it with reduce() or last()


How can I get last item of Flux without collapsing it with reduce() or last() ? Here is my use-case:

1) I have generator that produces Flux<T> based on state. 2) When inner Flux completes it alters the state that affect next Flux objects I emit in generator.

Schematicaly it looks like this

static class State {
    int secret = 2;
    int iteration = 0;
}

Random rand = new Random(1024);
Flux<Integer> stream = Flux.<Flux<Integer>, State>generate(State::new, (state, sink) -> {

    System.out.println(String.format("Generate: %d", state.secret));
    Flux<Integer> inner = Flux.range(1, rand.nextInt(10));

    sink.next(inner.doOnComplete(() -> {
        // How do I get last item of `inner` here ?
        // For example I'd like to decrement `state.secret` by last value of `inner`
    }));

    return state;
}).flatMap(Function.identity());

UPD: I unmarked my answer because the hack turned out to be unreliable. It's possible that .generate() will be invoked before previous Flux is fully consumed therefore making value in last incorrect.


Solution

  • First version was unreliable. I hacked another one:

    static <T> Flux<T> expandOnLastItem(Supplier<Flux<T>> seed, Function<T, Flux<T>> generator) {
        return Flux.just(new AtomicReference<T>())
                .flatMap(last -> Flux.just(seed.get().materialize())
                        .flatMap(Function.identity())
                        .expand(v -> {
                            if (v.hasValue()) {
                                last.set(v.get());
                            } else if (v.isOnComplete() && last.get() != null) {
                                Flux<T> res = generator.apply(last.get());
                                last.set(null);
                                return res.materialize();
                            }
                            return Flux.empty();
                        })
                        .filter(s -> !s.isOnComplete())
                        .dematerialize());
    }
    

    which can be used as

    static Flux<Integer> getPage(int pageId, int size) {
        return Flux.defer(() -> {
            if (pageId < 3) {
                System.out.println("Returning data for pageId: " + pageId);
                return Flux.range(pageId * 100, size);
            } else {
                System.out.println("Returning empty for pageId: " + pageId);
                return Flux.empty();
            }
        });
    }
    
    expandOnLastItem(
            () -> getPage(0, 5),
            lastId -> {
                System.out.println("  Expanding. Last item: " + lastId);
                int curPage = lastId / 100;
                return getPage(curPage + 1, 5);
            })
            .reduce(0L, (count, value) -> {
                System.out.println("==> " + value);
                return count + 1;
            })
            .block();
    

    So I hacked it by mutating state variable in generator. It works but it's not very functional. If anyone else can suggest alternative I'll greatly appreciate it.

    Random rand = new Random(1024);
    Flux.<Flux<String>, State>generate(State::new, (state, sink) -> {
    
        if (state.iteration < 4) {
            final int count = rand.nextInt(10) + 1;
            System.out.println(String.format("*** Generate %d: start %d (count %d)", state.iteration, state.secret, count));
            Flux<Integer> inner = Flux.range(state.secret, count);
    
            final int[] last = {Integer.MIN_VALUE};
            sink.next(
                    inner
                            .doOnNext(value -> {
                                last[0] = value;
                            })
                            .map(value -> String.format("Iter %d value %d", state.iteration, value))
                            .doOnComplete(() -> {
                                System.out.println(String.format("Inner complete (last item was %d)", last[0]));
                                state.secret = last[0];
                                state.iteration += 1;
                            }));
        } else {
            System.out.println("Generate complete");
            sink.complete();
        }
    
        return state;
    })
            .flatMap(Function.identity())
            .map(value -> {
                System.out.println(String.format("Ext map: %s", value));
                return value;
            })
            .buffer(5)
            .flatMapIterable(Function.identity())
            .subscribe(value -> System.out.println(String.format("  ---> %s", value)));
    
    System.out.println("Exiting");