Search code examples
scalarx-javarx-scala

Observable Exception handling


I'm learning RxScala and came to this very synthetic snippet. I'm trying to handle exception in onError block:

def doLongOperation():String = {
  (1 to 10).foreach {
    _ =>
      Thread.sleep(100)
      print(".")
  }
  println()
  if (System.currentTimeMillis() % 2 == 0) {
    throw new RuntimeException("Something went wrong during long operation")
  }
  s"OK"
}

def main(args: Array[String]) {
  println("Changing status:  -> doing task1")
  Observable.just(
    doLongOperation()
  ).subscribe(
    str => println("Changing status: doing task1 -> doing task2"),
    throwable => println(s"Failed: ${throwable.getMessage}"),     //never get here
    () => println("Completed part")
  )
}

In case of exception I expect something like:

Failed: Something went wrong during long operation

But what I get is:

.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)

What am I missing? Should I 'manually' call onError at observer? Appreciate for any help.


Solution

  • The problem is the misinterpretation of the just(). It takes an existing value the time the sequence is assembled and not a method which gets executed when a subscriber subscribes. In other terms, your code does this:

    var tempValue = doLongOperation();
    
    Observable.just(tempValue).subscribe(...)
    

    and throws way before an Observable is even created.

    (Sorry, I don't know Scala or RxScala enough so excuse my Java 8 examples.)

    I don't know how far RxScala is behind RxJava, but RxJava 1.0.15 has a new factory method, fromCallable that lets you defer a single value:

    Observable.fromCallable(() -> doLongOperation()).subscribe(...)
    

    The alternative is to wrap your original source into a defer so when doLongOperation throws, it gets routed to the subscriber:

    Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)