Search code examples
javafluxreactor

Passing state downstream in a Flux


I recently implemented a CSV parser in which I used switchOnFirst to extract the header, which then is passed along in an AtomicReference. Here is a simplified version:

Flux<String> lines = Flux.just("HEADER", "line1", "line2", "line3", "line4");

AtomicReference<String> header = new AtomicReference<>();
AtomicInteger count = new AtomicInteger(0);

lines.switchOnFirst((sig, flux) -> {
        String item = sig.get();
        if (sig.isOnNext() && item != null) {
            header.set(item);
            return flux.skip(1);
        } 
        return Flux.error(() -> new RuntimeException("No header was found"));
})
.flatMap(line -> Mono.just(header.get() + ":" + line))
.doOnNext(x -> count.incrementAndGet())
.subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println(count.get() + " rows processed")
);

I am aware that there is Flux.deferContextual(..), but I didn't figure out how to read context data from the subscription (to print out "X rows processed").

Is there a best practises for this?
Would the capturing atomic references approach above still be a better fit?

I tried to look for information around switchOnFirst and Flux.deferContextual(..) but I haven't found anything that suggests what the best practises is for this particular scenario in which I am trying to:

  • 1st parse a header that has to be passed downstream or somehow be made available to downstream emissions
  • 2nd include a count of emissions

Solution

  • If you just want a count of the number of items processed, it looks like the count() operator would work:

                    ...
                    .flatMap(line -> Mono.just(header.get() + ":" + line))
                    .count()
                    .doOnNext(x -> System.out.println(x + " count() rows processed"))
                    .subscribe(
                            System.out::println,
                            System.err::println);