Given an infinite flux of objects, where each object has an ID, how can I use flux to create a buffered list of updates grouped by ID property (keeping the last emitted value)? Thanks
Example
Obj(ID=A, V=1)
Obj(ID=A, V=2)
Obj(ID=B, V=3)
--- buffer -> I want to subscribe with a list of [Obj(ID=A, V=2), Obj(ID=B, V=3)]
Obj(ID=A, V=1)
Obj(ID=B, V=4)
Obj(ID=B, V=6)
Obj(ID=A, V=2)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=6), Obj(ID=A, V=2)]
Obj(ID=B, V=1)
--- buffer -> I want to subscribe with a list of [Obj(ID=B, V=1)]
Something like the following would be perfect but it seems to wait the end of the flux in my tests instead of buffering.
flux
.buffer(Duration.ofMillis(2000))
.groupBy(Obj::getId)
.flatMap(GroupedFlux::getLast)
.collectToList()
.subscribe(this::printList);
It works with buffer and custom logic for grouping
public static void main(String[] args) {
flux.buffer(Duration.ofMillis(2000)).subscribe(this::groupList);
}
private void groupList(List<T> ts) {
Collection<T> values = ts.stream()
.collect(Collectors.toMap(T::getId, Function.identity(), (k, v) -> v))
.values();
System.out.println(values);
}
I was able to achieve it with the reactive grouping
flux.window(Duration.ofMillis(2000))
.flatMap(window -> window.groupBy(Entry::getId)
.flatMap(GroupedFlux::last)
.collectList()
)
.subscribe(this::printList);