I am trying to use Monix Task
with mongo-scala-driver
. I have some trouble understanding Error Handling
val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
MongoTypedConnection.create[Task, DomainModel](
"mongodb:...&authMechanism=SCRAM-SHA-1"
)
mongoClient.use { client =>
val changeStream: Task[ChangeStreamObservable[DomainModel]] =
for {
collection <- client.getMongoCollection("myDatabase", "myCollection")
changes <- client.watchCollection(collection)
} yield changes
...
...
...
.as(ExitCode.Success)
}
This works perfectly well when there are no errors. I want to add error handling to this, (for example to handle incorrect database
and collection
names). My initial attempt based on the docs is to try:
val changeObs: io.Serializable =
Await.result(changeStream
.onErrorHandleWith {
case _: TimeoutException =>
// Oh, we know about timeouts, recover it
Task.now("Recovered!")
case other =>
// We have no idea what happened, raise error!
Task.raiseError(other)
}.runToFuture, 5.seconds)
But this gives me an io.Serializable
. How do I retain a ChangeStreamObservable[DomainModel]
while also having some kind of neat error handling? Appreciate pointers to any patterns that I could study.
BR
Turns out I was looking at this incorrectly.
Task[ChangeStreamObservable[DomainModel]]
already has a MonadError
. What that essentially means for a noob like me is that it does not lose errors. So this can be done at the very end of the code base:
changeStream //Or any other Task/Observable which (is composed)composes (from)this Task
.onErrorHandle {
case timeout: MongoTimeoutException =>
logger.error(timeout.getMessage)
case illegal: java.lang.IllegalArgumentException =>
logger.error(illegal.getMessage)
case unauthorized: com.mongodb.MongoCommandException =>
logger.error(unauthorized.getMessage)
I was trying to run the Task just to handle errors in the middle of a code base, thinking that If I compose multiple Tasks/Observable, I would lose the errors of the initial ones.