Search code examples
javascalareactive-programmingrx-scalareactivex

How to construct a Observable with custom numbers and different delays?


In order to test my reactive program with rxscala, I need to construct such an Observable:

val numberStream: Observable[Int] = Observable.???()

which

  1. publishes number 1
  2. then waits for 1s
  3. publishes number 4
  4. then waits for 3s
  5. publishes number 2
  6. then waits for 2s

I have a ugly solution, with Thread and ReplaySubject:

val subject: Subject[Int] = ReplaySubject()
val numberStream: Observable = subject

new Thread(new Runnable {
    def run = {
        subject.onNext(1)
        Thread.sleep(1000)
        subject.onNext(4)
        Thread.sleep(3000)
        subject.onNext(2)
        Thread.sleep(2000)
    }
}).start()

Is there any better solution?


Solution

  • You could concatenate together several Observable with delays, with the final Observable being and empty with a subscription delay.

    val numberStream = (
         Observable.just(1) ++ 
         Observable.just(4).delay(1.second) ++
         Observable.just(2).delay(3.second) ++
         Observable.empty.delaySubscription(2.second))
    

    As a side note, if you are testing you should be using a TestScheduler which you can pass as the second argument to delay.