Search code examples
rxjsrx-javareactive-programmingrx-kotlinrx-kotlin2

Rx (RxKotlin) - rightGroupJoin using groupJoin - merge / combine two observables of different types


After struggling for a few days now, on what seems to be a simple task, I come to you guys:)

Idea is simple. I have two streams/observables, 'left' and 'right'. I want items from 'right' to buffer/collect/aggregate to 'current' item in 'left'.
Thus, each item in 'left' defines a new 'window', while all 'right' items will bind to that window, until a new 'left' item is emitted. So, to visualize:

Task:
'left'     : |- A - - - - - B - - C - - - -|
'right'   : |- 1 - 2 - 3 -4 - 5 - 6 - - -|
'result' : |- - - - - - - -x - - -y - - - -z|   ( Pair<Left, List<Right>>)
Where: A,1 ; B,4 (so x) ; C (so y) are emitted on the same time
So:       x = Pair(A, [1,2,3]),   y = Pair(B, [4, 5])
And:    'right' & 'result' complete/terminate when 'left' does
So:      z = Pair(C, [6]) - emitted as a result of 'left' completing

----
EDIT 2 - FINAL SOLUTION!
In order to aggregate 'right' items with the next 'left' and not the previous one, I changed the code to this much shorter/simpler one:

fun <L, R> Observable<L>.rightGroupJoin(right: Observable<R>): Observable<Pair<L, List<R>>> {
    return this.share().run {
        zipWith(right.buffer(this), BiFunction { left, rightList ->
            Pair(left, rightList)
        })
    }
}  

EDIT 1 - initial solution!
Taken from @Mark's (accepted) answer below, here's what I came up with.
It is separated into smaller methods because I also do multiRightGroupJoin() to join in as many (right) streams as I want.

fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {
    return this.share().let { thisObservable ->    //use 'share' to avoid multi-subscription complications, e.g. multi calls to **preceding** doOnComplete
        thisObservable.flatMapSingle { t ->        //treat each 'left' as a Single
            bufferRightOnSingleLeft(thisObservable, t, right)
        }
    }
}

Where:

private fun <T, R> bufferRightOnSingleLeft(left: Observable<*>, leftSingleItem: T, right: Observable<R>)
    : Single<Pair<T, MutableList<R>>> {

    return right.buffer(left)                              //buffer 'right' until 'left' onNext() (for each 'left' Single) 
        .map { Pair(leftSingleItem, it) }
        .first(Pair(leftSingleItem, emptyList()))   //should be only 1 (list). THINK firstOrError
}  

----

What I got so far
After much reading and the understanding that somehow there's no implementation for this out of the box, I've decided to use groupJoin, mostly using this link, like so: (many problems and places to improve here, don't use this code)

private fun <T, R> Observable<T>.rightGroupJoin(right: Observable<R>): Observable<Pair<T, List<R>>> {

var thisCompleted = false //THINK is it possible to make the groupJoin complete on the left(this)'s onComplete automatically?
val thisObservable = this.doOnComplete { thisCompleted = true }
        .share() //avoid weird side-effects of multiple onSubscribe calls

//join/attach 'right/other' stream to windows (buffers), starting and ending on each 'this/left' onNext
return thisObservable.groupJoin(

    //bind 'right/other' stream to 'this/left'
    right.takeUntil { thisCompleted }//have an onComplete rule THINK add share() at the end?

    //define when windows start/end ('this/left' onNext opens new window and closes prev)
    , Function<T, ObservableSource<T>> { thisObservable }

    //define 'right/other' stream to have no windows/intervals/aggregations by itself
    // -> immediately bind each emitted item to a 'current' window(T) above
    , Function<R, ObservableSource<R>> { Observable.empty() }

    //collect the whole 'right' stream in 'current' ('left') window
    , BiFunction<T, Observable<R>, Single<Pair<T, List<R>>>> { t, rObs ->
        rObs.collect({ mutableListOf<R>() }) { acc, value ->
            acc.add(value)
        }.map { Pair(t, it.toList()) }

    }).mergeAllSingles()
}  

I also used similar usage to create a timedBuffer() - same as buffer(timeout) but with a timestamp on each buffer(List) to know when it started. Basically by running the same code on an Observable.interval(timeout) (as the 'left')

Problems / Questions (from the easiest to the hardest)

  1. Is it the best way of doing something like that? Isn't it an overkill?
  2. Is there a better way (must be) for completing the 'result' (and 'right') when 'left' is completed? Without this ugly boolean logic?
  3. This usage seem to mess up the order of the rx. See code and print below:

    leftObservable
    .doOnComplete {
        log("doOnComplete - before join")
     }
    .doOnComplete {
        log("doOnComplete 2 - before join")
     }
    .rightGroupJoin(rightObservable)
    .doOnComplete {
        log("doOnComplete - after join")
     }
    

Prints (sometimes! Looks like a race condition) the following:
doOnComplete - before join
doOnComplete - after join
doOnComplete 2 - before join

  1. On first run of the above code, doOnComplete - after join is not called, on the second time it's called twice. Third time is like the first, forth is like the second, etc...
    Both 3,4 are ran using this code. Probably has something to do with the subscribe {} usage? Note that I don't hold the disposable. This stream finishes because I GC the 'left' observable

    leftObservable.subscribeOn().observeOn()
    .doOnComplete{log...}
    .rightGroupJoin()
    .doOnComplete{log...}
    .subscribe {}  
    

Note1: adding .takeUntil { thisCompleted } after mergeAllSingles() seems to fix #4.

Note2: After using this method to join multiple streams and applying 'Note1', it's apparent that the onComplete (before the groupJoin() call !!!) will be called as many times as there are 'right' Observables, probably meaning that the cause is the right.takeUntil { thisCompleted }, is it really important to close the 'right' stream?

Note3: concerning Note1, it seems very much related to takeUntil vs. takeWhile. Using takeWhile lowers the doOnComplete calls, and it's somehow logical. Still trying to figure it out better.

  1. Can you think of a multiGroupJoin, or in our case, multiRightGroupJoin, other than running zip on groupJoin * rightObservablesCount?

Please ask anything you like. I know for a fact the my usage of the subscribe/disposable and the manual onComplete are not the way, I'm just not sure enough what is..


Solution

  • Something as simple as this should work :

    @JvmStatic
    fun main(string: Array<String>) {
        val left = PublishSubject.create<String>()
        val right = PublishSubject.create<Int>()
    
        left.flatMapSingle { s ->  right.buffer(left).map { Pair(s, it) }.firstOrError() }
                .subscribe{ println("Group : Letter : ${it.first}, Elements : ${it.second}") }
    
    
        left.onNext("A")
        right.onNext(1)
        right.onNext(2)
        right.onNext(3)
        left.onNext("B")
        right.onNext(4)
        right.onNext(5)
        left.onNext("C")
        right.onNext(6)
        left.onComplete()
    }
    

    Output :

    Group : Letter : A, Elements : [1, 2, 3]
    Group : Letter : B, Elements : [4, 5]
    Group : Letter : C, Elements : [6]
    

    Your Observable of interest is the left, so subscribe to it. Then just buffer the right by the left observable's next emission or completion. You're only ever interested in a single result of each upstream left emission, so just use flatMapSingle. I chose firstOrError() but obviously could have a default item or other error handling or even a flatMapMaybe coupled with firstElement()

    Edit

    OP has had further Q&A and discovered that the original question and above solution to buffer right values with the previous left emission, until the next left emission (as above), is not the required behaviour. New required behaviour is to buffer right values to the NEXT left emission as so:

    @JvmStatic
        fun main(string: Array<String>) {
            val left = PublishSubject.create<String>()
            val right = PublishSubject.create<Int>()
    
    
            left.zipWith (right.buffer(left), 
                    BiFunction<String, List<Int>, Pair<String, List<Int>>> { t1, t2 -> Pair(t1, t2)
            }).subscribe { println("Group : Letter : ${it.first}, Elements : ${it.second}") }
    
            left.onNext("A")
            right.onNext(1)
            right.onNext(2)
            right.onNext(3)
            left.onNext("B")
            right.onNext(4)
            right.onNext(5)
            left.onNext("C")
            right.onNext(6)
            left.onComplete()
        }
    

    Which produces a different final result as left values are zipped with previous right values, until next left emission (inverse).

    Output :

    Group : Letter : A, Elements : []
    Group : Letter : B, Elements : [1, 2, 3]
    Group : Letter : C, Elements : [4, 5]