I have a heart-rate sensor that emits a value periodically (anywhere between 500-3000 milli). When the heart rate sensor emits is non-deterministic. With RXJava, i would like to have a constant emitting the 'last seen' heart rate value and the constant emits the value for up to 10 Seconds until it marks it as too-stale & sends a NULL instead. NULL denotes that the heart rate sensor is no longer emitting sensor readings.
I have the following (kotlin) code:
val heartRateObservable: Observable<BLEDataValue> = observable
.flatMap { it.setupNotification(characteristic.uniqueIdentifier) }
.flatMap { it }
.map { BTDataPacket(characteristic.uniqueIdentifier, BleParseable(it)).btValue() }.onErrorReturn { BLEDataValueHeartRate(null) }
return Observable.combineLatest(Observable.interval(1000, TimeUnit.MILLISECONDS), heartRateObservable, BiFunction { _, t2 -> t2 })
Question: Is it possible to introduce a way to replay the last seen heart-rate value up to when the last value becomes stale (i.e. after not seeing any heart-rate readings for 10 seconds).. when a heart rate value is seen it replay this until a new heart-rate value arrives OR the timeout of 10 seconds passes as the last value is now too-stale?
You can use either takeWhile
or takeUntil
operators to complete your rate observable.
wait for 10 emissions :
Observable heartbeat = Observable.interval(1000, TimeUnit.MILLISECONDS)
.takeWhile(e -> e < 10)
or by using a timer
witj takeUntil
Observable heartbeat = Observable.interval(1000, TimeUnit.MILLISECONDS)
.takeUntil(Observable.timer(10000, TimeUnit.MILLISECONDS))
After that you said :
... and emit this for up to 10,000 Milliseconds from the last received sensor value after ...
For that you can use switchMap
(If I've understood your question)
heartRateObservable
.switchMap(data -> heartbeat.map(data))
Hence the heartbeat starts emitting values after each last emitted data.
val heartRateObservable: Observable<BLEDataValue> = observable
.flatMap { it.setupNotification(characteristic.uniqueIdentifier) }
.flatMap { it }
.map { BTDataPacket(characteristic.uniqueIdentifier, BleParseable(it)).btValue() }.onErrorReturn { BLEDataValueHeartRate(null) }
return heartRateObservable
.switchMap { data -> Observable.interval(1000, TimeUnit.MILLISECONDS)
.takeWhile(e -> e < 10)
.map(l -> data)
}