In order to test my reactive program with rxscala, I need to construct such an Observable
:
val numberStream: Observable[Int] = Observable.???()
which
1
1s
4
3s
2
2s
I have a ugly solution, with Thread
and ReplaySubject
:
val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject
new Thread(new Runnable {
def run = {
subject.onNext(1)
Thread.sleep(1000)
subject.onNext(4)
Thread.sleep(3000)
subject.onNext(2)
Thread.sleep(2000)
}
}).start()
Is there any better solution?
You could concatenate together several Observable with delays, with the final Observable being and empty
with a subscription delay.
val numberStream = (
Observable.just(1) ++
Observable.just(4).delay(1.second) ++
Observable.just(2).delay(3.second) ++
Observable.empty.delaySubscription(2.second))
As a side note, if you are testing you should be using a TestScheduler
which you can pass as the second argument to delay
.