Search code examples
androidrx-java

RXJava - continuously emit values for determined period


I have a heart-rate sensor that emits a value periodically (anywhere between 500-3000 milli). When the heart rate sensor emits is non-deterministic. With RXJava, i would like to have a constant emitting the 'last seen' heart rate value and the constant emits the value for up to 10 Seconds until it marks it as too-stale & sends a NULL instead. NULL denotes that the heart rate sensor is no longer emitting sensor readings.

I have the following (kotlin) code:

    val heartRateObservable: Observable<BLEDataValue> = observable
            .flatMap { it.setupNotification(characteristic.uniqueIdentifier) }
            .flatMap { it }
            .map { BTDataPacket(characteristic.uniqueIdentifier, BleParseable(it)).btValue() }.onErrorReturn { BLEDataValueHeartRate(null) }
    return Observable.combineLatest(Observable.interval(1000, TimeUnit.MILLISECONDS), heartRateObservable, BiFunction { _, t2 -> t2 })

Question: Is it possible to introduce a way to replay the last seen heart-rate value up to when the last value becomes stale (i.e. after not seeing any heart-rate readings for 10 seconds).. when a heart rate value is seen it replay this until a new heart-rate value arrives OR the timeout of 10 seconds passes as the last value is now too-stale?


Solution

  • You can use either takeWhile or takeUntil operators to complete your rate observable.

    wait for 10 emissions :

    Observable heartbeat = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .takeWhile(e -> e < 10)
    

    or by using a timer witj takeUntil

    Observable heartbeat = Observable.interval(1000, TimeUnit.MILLISECONDS)
    .takeUntil(Observable.timer(10000, TimeUnit.MILLISECONDS))
    

    After that you said :

    ... and emit this for up to 10,000 Milliseconds from the last received sensor value after ...

    For that you can use switchMap (If I've understood your question)

    heartRateObservable
    .switchMap(data -> heartbeat.map(data))
    

    Hence the heartbeat starts emitting values after each last emitted data.

    val heartRateObservable: Observable<BLEDataValue> = observable
                .flatMap { it.setupNotification(characteristic.uniqueIdentifier) }
                .flatMap { it }
                .map { BTDataPacket(characteristic.uniqueIdentifier, BleParseable(it)).btValue() }.onErrorReturn { BLEDataValueHeartRate(null) }
    return heartRateObservable
           .switchMap { data -> Observable.interval(1000, TimeUnit.MILLISECONDS)
                         .takeWhile(e -> e < 10) 
                         .map(l -> data)
                      }