I am trying to work with collection of futures and having trouble with returning the result from a def based on future status. Below is my code:
final case class StagesToRun(stages : Set[StageRun])
private def processNextStagesAndAccumulateResults(stagesToRun: StagesToRun): \/[Exception, Success] = {
val stageProcessingExceptions = mutable.Set[Exception]()
//processor.process(stagesToRun) => returns a Set[Future[\/[Exception, Success]]] and I am converting it to Future[Set[\/[Exception, Success]]] in below expression
val processResults = Future.sequence(processor.process(stagesToRun))
processResults.onSuccess {
case result => {
result.map { res =>
res.fold(
l => stageProcessingExceptions += l,
r => r
)
}
if (stageProcessingExceptions.isEmpty) Success.right
else new Exception("Got exception while processing one of the stage").left
}
}
processResults.onFailure {
case ex => new Exception(ex.getMessage)).left
}
}
Now as per Scala conventions the last statement of my function becomes the return statement of my function. In this function it should be basically the output of if (stageProcessingExceptions.isEmpty) Success
and its corresponding else
or the outcome of onFailure
i.e new Exception(ex.getMessage))
. However the compiler keeps on telling me that the return type is unit rather than expected disjunction. Can someone please help me here? Thanks
You are absolutely correct when you say that the last statement of the function becomes the return statement. However, if you see the method definition of onSuccess
and onFailure
, both of them return Unit
as the return type.
From scala docs, signature of onSuccess is
def onSuccess[U](pf: PartialFunction[T, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Success(v) =>
pf.applyOrElse[T, Any](v, Predef.identity[T]) // Exploiting the cached function to avoid MatchError
case _ =>
}
On similar lines onFailure returns unit.
def onFailure[U](@deprecatedName('callback) pf: PartialFunction[Throwable, U])(implicit executor: ExecutionContext): Unit = onComplete {
case Failure(t) =>
pf.applyOrElse[Throwable, Any](t, Predef.identity[Throwable]) // Exploiting the cached function to avoid MatchError
case _ =>
}
In your situation what you can do is to apply a map function on the future and instead of onComplete. This will help you to propagate your required type. Also, if you want to handle the condition where your future is failing, you can add a recover block to your future as
.recover {
case _ =>
//whatever type you want to propogate ex: Left()
}