Search code examples
javaobservableintervalsrx-java2reactivex

Span out values emitted by an Observable


I'm a begginer when it comes to ReactiveX, so this question might be trivial, but I haven't found an answer through searching.

I have an observable that emits items quite irregularly (from ~4 times per second to once every 5 seconds) and I want to make sure it never emits an item more often that once per second. I thought about using zip operator with Observable.interval(), but I realized that if it emits an item after 5 seconds and then emits 3 items in less than a second, all of those items will be emitted in one second.

Is there any simple way to do it?


Solution

  • There are a couple of ways to achieve what you want. What you end up using depends on you use case.

    throttleLast

    throttle let's you specify that you only want a new value at a certain interval. This is probably closest to what you were trying to achieve with zip and interval:

    myObservable.throttleLast(1, TIMEUNIT.SECOND)
    

    This will emit the most recent signal emitted every second. The other signals emitted inside the second are discarded.

    buffer

    buffer does much the same as throttleLast, but instead of passing the most recently emitted value, it will return a Flowable with all the values emitted in the timespan.

    myObservable.buffer(1, TIMEUNIT.SECOND)