Search code examples
javarx-javareactive-programmingreactivex

RxJava buffering - ignoring zero items


Here's the code that I have for buffering and converting incoming events:

public Publisher<Collection<EventTO>> logs(String eventId) {
    ConnectableObservable<Event> connectableObservable = eventsObservable
        .share().publish();
    connectableObservable.connect();

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .map(eventsMapper::mapCollection);
}

The problem here is that Flowable returns an empty list each second although there are no events published to the eventsObservable.

Is there a way to hold .buffer until there is at least one object?

Note: Looks like there is a way to do it in C# (described here: https://stackoverflow.com/a/30090185/668148). But how can it be done in Java?


Solution

  • As suggested by Mark Keen, .distinctUntilChanged does the trick.

    So the following code will push the list of events if there are 1+ items after buffering:

    connectableObservable.toFlowable(BackpressureStrategy.BUFFER)
        .filter(event -> event.getId().equals(eventId))
        .buffer(1, TimeUnit.SECONDS, 50)
        .distinctUntilChanged()             // <<<======  
        .map(eventsMapper::mapCollection);