Search code examples
scalataskfuturemonix

Monix task handle failure for list of futures


How can I handle the failure during the asynchronous execution of the task? I.e. at least print the stack trace and shut down. The code below seems to wait forever for input > 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

edit

trying:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

will not stop the computation. How can I log the stack trace and cancel the ongoing computations and stop?


Solution

  • A more idiomatic approach would be to use Observable instead of Task since it is dealing with list of data (I'm assuming that's the use case since it is shown in the question).

     val obs = Observable
      .fromIterable(Range(1, 40))
      .mapEval(i =>
        if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
        else Task.delay(println(i)) // Or write to file in your case
      )
      .completedL
      .runToFuture
    
    
    obs
      .recover {
        case NonFatal(e) => println("Error")
      }
    

    Alternatively, you can also signal the error with Either which leads to better type safety, since you'll need to handle the Either result.

    val obs = Observable
      .fromIterable(Range(1, 40))
      .mapEval(i =>
        if (i + 1 > 5) Task.pure(Left("Error"))
        else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
      )
      .takeWhileInclusive(_.isRight) // will also emit the failing result
      .lastL
      .runToFuture
    
    
    obs.map {
      case Left(err) => println("There's an error")
      case _ => println("Completed successfully")
    }