Search code examples
javafluxproject-reactor

How can I aggregate elements on a flux by group / how to reduce groupwise?


Assume you have a flux of objects with the following structure:

class Element {
  String key;
  int count;
}

Now imagine those elements flow in a predefined sort order, always in groups of a key, like

{ key = "firstKey",  count=123}
{ key = "firstKey",  count=1  }
{ key = "secondKey", count=4  }
{ key = "thirdKey",  count=98 }
{ key = "thirdKey",  count=5  }
 .....

What I want to do is create a flux which returns one element for each distinct key and summed count for each key-group. So basically like a classic reduce for each group, but using the reduce operator does not work, because it only returns a single element and I want to get a flux with one element for each distinct key.

Using bufferUntil might work, but has the drawback, that I have to keep a state to check if the key has changed in comparison to the previous one.

Using groupBy is an overkill, as I know that each group has come to an end once a new key is found, so I don't want to keep anything cached after that event.

Is such an aggregation possible using Flux, without keeping a state outside of the flow?


Solution

  • This is currently (as of 3.2.5) not possible without keeping track of state yourself. distinctUntilChanged could have fit the bill with minimal state but doesn't emit the state, just the values it considered as "distinct" according to said state.

    The most minimalistic way of solving this is with windowUntil and compose + an AtomicReference for state-per-subscriber:

    Flux<Tuple2<T, Integer>> sourceFlux = ...; //assuming key/count represented as `Tuple2`
    Flux<Tuple2<T, Integer>> aggregated = sourceFlux.compose(source -> {
        //having this state inside a compose means it will not be shared by multiple subscribers
        AtomicReference<T> last = new AtomicReference<>(null);
    
        return source
          //use "last seen" state so split into windows, much like a `groupBy` but with earlier closing
          .windowUntil(i -> !i.getT1().equals(last.getAndSet(i.getT1())), true)
          //reduce each window
          .flatMap(window -> window.reduce((i1, i2) -> Tuples.of(i1.getT1(), i1.getT2() + i2.getT2()))
    });