I'm learning RxScala and came to this very synthetic snippet. I'm trying to handle exception in onError block:
def doLongOperation():String = {
(1 to 10).foreach {
_ =>
Thread.sleep(100)
print(".")
}
println()
if (System.currentTimeMillis() % 2 == 0) {
throw new RuntimeException("Something went wrong during long operation")
}
s"OK"
}
def main(args: Array[String]) {
println("Changing status: -> doing task1")
Observable.just(
doLongOperation()
).subscribe(
str => println("Changing status: doing task1 -> doing task2"),
throwable => println(s"Failed: ${throwable.getMessage}"), //never get here
() => println("Completed part")
)
}
In case of exception I expect something like:
Failed: Something went wrong during long operation
But what I get is:
.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)
What am I missing? Should I 'manually' call onError at observer? Appreciate for any help.
The problem is the misinterpretation of the just(). It takes an existing value the time the sequence is assembled and not a method which gets executed when a subscriber subscribes. In other terms, your code does this:
var tempValue = doLongOperation();
Observable.just(tempValue).subscribe(...)
and throws way before an Observable is even created.
(Sorry, I don't know Scala or RxScala enough so excuse my Java 8 examples.)
I don't know how far RxScala is behind RxJava, but RxJava 1.0.15 has a new factory method, fromCallable
that lets you defer a single value:
Observable.fromCallable(() -> doLongOperation()).subscribe(...)
The alternative is to wrap your original source into a defer
so when doLongOperation
throws, it gets routed to the subscriber:
Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)