Search code examples
mongodbscalamonix

Monix Task with Mongo Collection: Error handling


I am trying to use Monix Task with mongo-scala-driver. I have some trouble understanding Error Handling

    val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
      MongoTypedConnection.create[Task, DomainModel](
        "mongodb:...&authMechanism=SCRAM-SHA-1"
      )

    mongoClient.use { client =>
      val changeStream: Task[ChangeStreamObservable[DomainModel]] =
        for {
          collection <- client.getMongoCollection("myDatabase", "myCollection")
          changes    <- client.watchCollection(collection)
        } yield changes
        ...
        ...
        ...
        .as(ExitCode.Success)
      }

This works perfectly well when there are no errors. I want to add error handling to this, (for example to handle incorrect database and collection names). My initial attempt based on the docs is to try:

      val changeObs: io.Serializable =
        Await.result(changeStream
          .onErrorHandleWith {
            case _: TimeoutException =>
              // Oh, we know about timeouts, recover it
              Task.now("Recovered!")
            case other =>
              // We have no idea what happened, raise error!
              Task.raiseError(other)
          }.runToFuture, 5.seconds)

But this gives me an io.Serializable. How do I retain a ChangeStreamObservable[DomainModel] while also having some kind of neat error handling? Appreciate pointers to any patterns that I could study.

BR


Solution

  • Turns out I was looking at this incorrectly.

    Task[ChangeStreamObservable[DomainModel]] already has a MonadError. What that essentially means for a noob like me is that it does not lose errors. So this can be done at the very end of the code base:

          changeStream //Or any other Task/Observable which (is composed)composes (from)this Task
            .onErrorHandle {
              case timeout: MongoTimeoutException =>
                logger.error(timeout.getMessage)
              case illegal: java.lang.IllegalArgumentException =>
                logger.error(illegal.getMessage)
              case unauthorized: com.mongodb.MongoCommandException =>
                logger.error(unauthorized.getMessage)
    

    I was trying to run the Task just to handle errors in the middle of a code base, thinking that If I compose multiple Tasks/Observable, I would lose the errors of the initial ones.