Search code examples
javarx-javareactivex

Is there a better way to get the intersection of two Observables using RxJava


Using RxJava, I have one source Observable that emits a number of items that I am looking to intersect with another Observable that emits the same type. After working through a number of options, it would seem that that the most coherent way to structure things would be this:

Observable<String> source = ...emits 20 items

Observable.create(subscriber -> {
    source
        .buffer(5)
        .subscribe(things -> {
            tocheck.getMatches(things) //emits 3 matches
                .subscribe(subscriber::onNext, subscriber::onError, () -> {});
        }, subscriber::onError, subscriber::onCompleted));

The expected output here would be that when I subscribe to the resulting Observable, I get 12 items emitted. It is a requirement that I buffer results due to the contract of getMatches.

On it's face this seems like would work but it does not seem like the cleanest way. Filter doesn't seem to apply here because I cannot run the intersect check on every item for performance reasons. I toyed around with using flatMap but the getMatches observable completes the stream instead of the completion notification coming from the source observable.

Is there a better way to structure this?

Edit: To clarify what was happening with this style of code:

Observable<String> source = ...emits 20 items

source
    .buffer(5)
    .flatMap(this::getMatches);  //final observable would emit a total of 12 items

This is clearly way cleaner, but when I add some logging (assuming the same data sizes as the original snippet:

source
    .doOnEach(notification -> {
        log.trace("Processing {}", notification.getValue());
    })
    .buffer(5)
    .flatMap(this::getMatches)
    .doOnEach(notification -> {
        log.trace("Processing after match {}", notification.getValue());
    });

I get 20 instances of the "Processing" log, then strangely only a few log lines from the "Processing after" (when I would expect 12). It seems to be calling an on complete earlier than it should be. Maybe I'm structuring something wrong?


Solution

  • So it looks like AndroidEx was spot on. I am using the Redis Lettuce reactive API and it doesn't look like it is behaving appropriately. The added code snippet from above is the correct way to structure an intersection of two Observables.