Search code examples
scalaasync-awaitfuturefor-comprehension

Scala Futures for-comprehension with a list of values


I need to execute a Future method on some elements I have in a list simultaneously. My current implementation works sequentially, which is not optimal for saving time. I did this by mapping my list and calling the method on each element and processing the data this way.

My manager shared a link with me showing how to execute Futures simultaneously using for-comprehension but I cannot see/understand how I can implement this with my List.

The link he shared with me is https://alvinalexander.com/scala/how-use-multiple-scala-futures-in-for-comprehension-loop/

Here is my current code:

private def method1(id: String): Tuple2[Boolean, List[MyObject]] = {
    val workers = List.concat(idleWorkers, activeWorkers.keys.toList)
    var ready = true;
    val workerStatus = workers.map{ worker =>
      val option =  Await.result(method2(worker), 1 seconds)
      var status = if (option.isDefined) {
        if (option.get._2 == id) {
          option.get._1.toString
        } else {
          "INVALID"
        }
      } else "FAILED"
      val status = s"$worker: $status"
      if (option.get._1) {
        ready = false
      }
      MyObject(worker.toString, status)
    }.toList.filterNot(s => s. status.contains("INVALID"))
    (ready, workerStatus)
  }

  private def method2(worker: ActorRef): Future[Option[(Boolean, String)]] = Future{
      implicit val timeout: Timeout = 1 seconds;
      Try(Await.result(worker ? GetStatus, 1 seconds)) match {
            case Success(extractedVal) => extractedVal match {
                case res: (Boolean, String) => Some(res)
                case _ => None
            }
            case Failure(_) => { None }
            case _ => { None }
        }
  }

If someone could suggest how to implement for-comprehension in this scenario, I would be grateful. Thanks


Solution

  • For method2 there is no need for the Future/Await mix. Just map the Future:

    def method2(worker: ActorRef): Future[Option[(Boolean, String)]] =
      (worker ? GetStatus).map{
          case res: (Boolean, String) => Some(res)
          case _ => None
        }
    

    For method1 you likewise need to map the result of method2 and do the processing inside the map. This will make workerStatus a List[Future[MyObject]] and means that everything runs in parallel.

    Then use Future.sequence(workerStatus) to turn the List[Future[MyObject]] into a Future[List[MyObject]]. You can then use map again to do the filtering/ checking on that List[MyObject]. This will happen when all the individual Futures have completed.

    Ideally you would then return a Future from method1 to keep everything asynchronous. You could, if absolutely necessary, use Await.result at this point which would wait for all the asynchronous operations to complete (or fail).