Search code examples
asynchronousplayframeworkplayframework-2.0playframework-2.2

Play framework async code dos not wait to result


I have three functions.

myFunc1:

def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
  kafkaManager.getConsumerIdentity(cluster, consumer) map { errorOrT =>
    errorOrT.fold(
      error => BadRequest(Json.obj("msg" -> error.msg)),
      T=> {
        Ok(Json.obj("summary" -> getSummary(cluster, consumer, T.Map.keys)))
      })
}

getSummary:

def getSummary(cluster: String, consumer: String, myMap: Iterable[String]) = {
  var topics: Map[String, JsObject] = Map()
  myMap.map { key =>
    topicSummary(cluster, consumer, x).map(r => {
      r.fold(l => {}, value => {
        topics += key -> value
      })
    })
  }
  topics
}

and topicSummary:

def topicSummary(cluster: String, consumer: String, topic: String) = {
  kafkaManager.getConsumedTopicState(cluster, consumer, topic).map { errorOrTopicSummary =>
    errorOrTopicSummary.map(
      topicSummary => {
        Json.obj("totalLag" -> topicSummary.totalLag.getOrElse(None).toString(), "percentageCovered" -> topicSummary.percentageCovered)
    })
  }
}

the result is :

{"summary":()}

the problem is that getSummary did not wait to result. I'd love to hear suggestions how to fix it


Solution

  • It's hard to tell exactly what's going on because you haven't put explicit return types on your functions. Assuming all the Kafka calls are asynchronous, what appears to be happening is that topicSummary returns a Future[JsObject], but getSummary, which calls it, does not wait for its result but instead returns the (empty) topics map immediately.

    When you're dealing with Futures you need to either:

    • deal with Futures all the way through your code path (recommended)
    • explicitly Await a future to materialize its result (not recommended)

    You can resolve this by implementing getSummary asynchronously, which would look something like this:

    myFunc1:

    def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
      // NB: Since we're dealing with a Future within a Future, we
      // use flatMap to combine them
      kafkaManager.getConsumerIdentity(cluster, consumer).flatMap { errorOrT =>
        errorOrT.fold(
          error => 
            // In the case of an error, return a no-op Future with .successful
            Future.successful(BadRequest(Json.obj("msg" -> error.msg))),
    
          // Otherwise, map over the results of get summary
          T => getSummary(cluster, consumer, T.Map.keys).map { topics =>
            Ok(Json.obj("summary" -> topics))
          }
        )
      }
    }
    

    getSummary (approximate code):

    def getSummary(cluster: String, consumer: String, myMap: Iterable[String]): Future[Map[String, JsObject]] = {
    
      // For each key you expect a Future[JsObject], which you want to
      // transform into a tuple of key/object
      val topicKeys: List[Future[(String, JsObject)]] = myMap.toList.map { key =>
        topicSummary(cluster, consumer, key)
          // Map the future optional value with its key, defaulting
          // to an empty object
          .map(topicsOpt => key -> topicsOpt.getOrElse(Json.obj()))
    
          // OPTIONAL: handle any error with an empty object
          .recover {
            case _ => key -> Json.obj()
          }
      }
    
      // Finally, use Future.fold to turn a list of futures into a 
      // single Future sequence, then combine the (String, JsObject)
      // tuples into a map
      Future.fold(topicKeys)(Map.empty[String, JsObject])(_ + _)
    }
    

    It's really helpful when dealing with futures to be explicit about expected return types, at least when trying to understand intermediate states. If you end up with too many nested maps and flatMaps, investigate using for comprehensions to make things look cleaner, but that's another question.