Here's the code that I have for buffering and converting incoming events:
public Publisher<Collection<EventTO>> logs(String eventId) {
ConnectableObservable<Event> connectableObservable = eventsObservable
.share().publish();
connectableObservable.connect();
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.map(eventsMapper::mapCollection);
}
The problem here is that Flowable
returns an empty list each second although there are no events published to the eventsObservable
.
Is there a way to hold .buffer
until there is at least one object?
Note: Looks like there is a way to do it in C# (described here: https://stackoverflow.com/a/30090185/668148). But how can it be done in Java?
As suggested by Mark Keen, .distinctUntilChanged
does the trick.
So the following code will push the list of events if there are 1+ items after buffering:
connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
.filter(event -> event.getId().equals(eventId))
.buffer(1, TimeUnit.SECONDS, 50)
.distinctUntilChanged() // <<<======
.map(eventsMapper::mapCollection);