Search code examples
scalarx-javarx-scala

Terminate Observable by timeout


I'm trying to limit life of observable by timeout:

def doLongOperation() = {
   Thread.sleep(duration)
   "OK"
}

def firstStep = Observable.create(
  (observer: Observer[String]) => {
    observer.onNext(doLongOperation())
    observer.onCompleted()
    Subscription()
  }
)

firstStep
  .timeout(1 second)
  .subscribe(
    item => println(item),
    throwable => throw throwable,
    () => println("complete")
  ) 

I would like to distinguish between following results:

  1. Observable finished by timeout, no result obtained
  2. Exception thrown during execution
  3. Execution finished successfully, return value

I can process cases 2 and 3 with no problem in partials onNext and onError, but how do I detect if observable finished by timeout?

One more thing: I've never get into block onComplete, though there is a call to obeserver.onCompleted() in my code. Why?


Solution

  • If a timeout happens, that TimeoutException is emitted on the computation thread where a throw throwable is ends up being ignored and your main thread won't and can't see it. You can add toBlocking after the timeout so any exception will end up on the same thread:

    firstStep
      .timeout(1 second)
      .toBlocking()
      .subscribe(
        item => println(item),
        throwable => println(throwable),
        () => println("complete")
    

    )