I have three functions.
myFunc1:
def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
kafkaManager.getConsumerIdentity(cluster, consumer) map { errorOrT =>
errorOrT.fold(
error => BadRequest(Json.obj("msg" -> error.msg)),
T=> {
Ok(Json.obj("summary" -> getSummary(cluster, consumer, T.Map.keys)))
})
}
getSummary:
def getSummary(cluster: String, consumer: String, myMap: Iterable[String]) = {
var topics: Map[String, JsObject] = Map()
myMap.map { key =>
topicSummary(cluster, consumer, x).map(r => {
r.fold(l => {}, value => {
topics += key -> value
})
})
}
topics
}
and topicSummary:
def topicSummary(cluster: String, consumer: String, topic: String) = {
kafkaManager.getConsumedTopicState(cluster, consumer, topic).map { errorOrTopicSummary =>
errorOrTopicSummary.map(
topicSummary => {
Json.obj("totalLag" -> topicSummary.totalLag.getOrElse(None).toString(), "percentageCovered" -> topicSummary.percentageCovered)
})
}
}
the result is :
{"summary":()}
the problem is that getSummary did not wait to result. I'd love to hear suggestions how to fix it
It's hard to tell exactly what's going on because you haven't put explicit return types on your functions. Assuming all the Kafka calls are asynchronous, what appears to be happening is that topicSummary
returns a Future[JsObject]
, but getSummary
, which calls it, does not wait for its result but instead returns the (empty) topics map immediately.
When you're dealing with Futures you need to either:
Await
a future to materialize its result (not recommended)You can resolve this by implementing getSummary
asynchronously, which would look something like this:
myFunc1:
def myFunc1(cluster: String, consumer: String) = Action.async { implicit request =>
// NB: Since we're dealing with a Future within a Future, we
// use flatMap to combine them
kafkaManager.getConsumerIdentity(cluster, consumer).flatMap { errorOrT =>
errorOrT.fold(
error =>
// In the case of an error, return a no-op Future with .successful
Future.successful(BadRequest(Json.obj("msg" -> error.msg))),
// Otherwise, map over the results of get summary
T => getSummary(cluster, consumer, T.Map.keys).map { topics =>
Ok(Json.obj("summary" -> topics))
}
)
}
}
getSummary (approximate code):
def getSummary(cluster: String, consumer: String, myMap: Iterable[String]): Future[Map[String, JsObject]] = {
// For each key you expect a Future[JsObject], which you want to
// transform into a tuple of key/object
val topicKeys: List[Future[(String, JsObject)]] = myMap.toList.map { key =>
topicSummary(cluster, consumer, key)
// Map the future optional value with its key, defaulting
// to an empty object
.map(topicsOpt => key -> topicsOpt.getOrElse(Json.obj()))
// OPTIONAL: handle any error with an empty object
.recover {
case _ => key -> Json.obj()
}
}
// Finally, use Future.fold to turn a list of futures into a
// single Future sequence, then combine the (String, JsObject)
// tuples into a map
Future.fold(topicKeys)(Map.empty[String, JsObject])(_ + _)
}
It's really helpful when dealing with futures to be explicit about expected return types, at least when trying to understand intermediate states. If you end up with too many nested maps and flatMaps, investigate using for
comprehensions to make things look cleaner, but that's another question.