Search code examples

RxScala Observables with replay

I'm trying to understand replay in RxScala. I create an observable like this:

lazy val toyObservable : Observable[Int] = {
    val coldObservable : Observable[Int] = intPerSecond
    val hotObservable : ConnectableObservable[Int] = coldObservable.publish
    val cachedObservable = hotObservable //.replay(3)   //<<<<<<<<< ODD THING 

where intPerSecond shoots out one integer per second, starting at 0. The first observer to subscribe indeed sees one integer per second. If the second observer joins in at t=6 seconds, then from that point they both see a matching stream 6...7...8...9... at one second intervals. That's as expected.

Now if I add in the .replay(3) I'd expect that when the second observer joins, he'd see 3456...7...8...9, ie he'd immediately get 3 integers from the cache, then receive them at one per second as they were produced. But instead, neither observer now sees anything. Do I have the syntax wrong?


  • You forget to call hotObservable.connect. The following codes output exactly what you want:

    import rx.lang.scala._
    import rx.lang.scala.observables._
    import scala.concurrent.duration._
    val intPerSecond = Observable.interval(1.seconds).map(_.toInt)
    val coldObservable : Observable[Int] = intPerSecond
    val hotObservable : ConnectableObservable[Int] = coldObservable.publish
    val cachedObservable = hotObservable.replay(3)
    cachedObservable.foreach(i => println(s"1: $i"))
    cachedObservable.foreach(i => println(s"2: $i"))