Search code examples
kotlinrx-javarx-kotlin

Rxjava - How to get the current and the previous item?


How do to use operators so that i always get the previous and the current value? If possible i want to avoid creating state outside the pipe.

- time ->
1      2      3      4
|      |      |      |
Operations
       |      |      |
       (1,2)  (2,3)  (3,4)

Note that every value besides the first and the last one have to appear twice, so a simple buffer won't do.

I thought about combining skip with merge and buffer but merge does not seem to guarantee ordering.

val s = PublishSubject.create<Int>()
s.mergeWith(s.skip(1)).buffer(2).subscribe{i -> print(i)}
s.onNext(1)
s.onNext(2)
s.onNext(3)
s.onNext(4)


output:
[1, 2][2, 3][3, 4]

val o = Observable.just(1,2,3,4)
o.mergeWith(o.skip(1)).buffer(2).subscribe{i -> print(i)}

output:
[1, 2][3, 4][2, 3][4]

(the sole 4 is fine, and expected)


Solution

  • Looks like you still can use buffer:

    Observable.just(1, 2, 3, 4)
        .buffer(2, 1)
        .subscribe { println(it) }
    
    // prints
    // [1, 2]
    // [2, 3]
    // [3, 4]
    // [4]