Search code examples
observablerx-scala

RxScala Observer and Subscription


I'm just getting started with RxScala and came across a few examples. I understand that there is an Observable contract in which the Observer trait is defined as follows:

trait Observer[T] {
  def onNext(event: T): Unit
  def onError(error: Throwable): Unit
  def onCompleted(): Unit
}

I can subscribe to an Observable stream of events using an Observer. So an Observer is anything that provides implementation for the 3 methods in the Observer trait.

Now, in the following example:

object TimedObservable extends App {
  val o = Observable.timer(1.second)
  o.subscribe(secs => println(" seconds is " + secs))
}

I'm creating an Observable event of type Long and in the subscribe call, I only have the onNext covered. What about onError and onCompleted?

Edit: What happens if I do not provide onError and onCompleted handlers? Will I violate any thread semantics?


Solution

  • I had a look at the API documentation:

    http://reactivex.io/rxscala/scaladoc/#rx.lang.scala.Observable

    Looks like there are overloaded versions of the subscribe method.