Search code examples
javakotlinrx-javaobservablereactivex

RxJava Sliding Window


I have an observable which emits data and I want to initially buffer it for three seconds and then there must be a sliding of one second after initial buffer. This is more like buffer(timespan,unit,skip) where the skip is on the timespan.

Sample:

ObservableData,TimeStamp : (5,1),(10,1.5),(30,2.8),(40,3.2),(60,3.8),(90,4.2)

ExpectedList : {5,10,30},{10,30,40,60},{30,40,60,90}

I can achieve this by creating a custom operator. I just want to know is there any way to do it without relying on the custom operator.


Solution

  • I think it can be solved with builtin operators. Code below demonstrates one of the approaches, though things can get tricky when used for hot or non-lightweight cold sources - I encourage you to use it for educational/ get-an-idea purposes, and not production use

     @Test
    fun slidingWindow() {
        val events = Flowable.just(
                Data(5, 1.0),
                Data(10, 1.5),
                Data(30, 2.8),
                Data(40, 3.2),
                Data(60, 3.8),
                Data(90, 4.2))
                .observeOn(Schedulers.io())
        val windows = window(windowSize = 3, slideSize = 1, data = events).toList().blockingGet()
        Assert.assertNotNull(windows)
        Assert.assertFalse(windows.isEmpty())
    }
    
    fun window(windowSize: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> = window(
            from = 0,
            to = windowSize,
            slideSize = slideSize,
            data = data)
    
    fun window(from: Int, to: Int, slideSize: Int, data: Flowable<Data>): Flowable<List<Int>> {
        val window = data.takeWhile { it.time <= to }.skipWhile { it.time < from }.map { it.data }
        val tail = data.skipWhile { it.time <= from + slideSize }
        val nonEmptyWindow = window.toList().filter { !it.isEmpty() }
        val nextWindow = nonEmptyWindow.flatMapPublisher {
            window(from + slideSize, to + slideSize, slideSize, tail).observeOn(Schedulers.io())
        }
        return nonEmptyWindow.toFlowable().concatWith(nextWindow)
    }
    
    data class Data(val data: Int, val time: Double)
    

    The test above yields
    [[5, 10, 30], [10, 30, 40, 60], [30, 40, 60, 90], [40, 60, 90], [90]]