Search code examples
rxjsrx-javareactive-programming

Rx - How to apply to stream distinctUntilChanged that allows emit duplicate item after some period?


So, I have to implement some chain or perhaps some custom RxJava operator to an Observable, that will distinct items emitted from Observable, until they change, but only for some short period (like 1 sec), then duplicated item can be emitted again.

What I need is some nested combination of distinctUntilChanged, with throttle?

Main requirements are:

  • different items have to be emitted without any delay
  • the same item can not be emitted twice in given period

I couldn't find any operator that matches my requirement so, probably I'll need to write some custom Rx operator, but still I can't figure out how to start


Solution

  • So, I figured out, turned out to be quite easy to implement:

    fun <T> Observable<T>.distinctUntil(time: Long): Observable<T> {
        return this
            .timestamp()
            .distinctUntilChanged { old, new ->
                new.value() == old.value() && new.time() - old.time() < time
            }
            .map { it.value() }
    }