Search code examples
scalaakkaakka-streamakka-http

Akka Stream use HttpResponse in Flow


I would like to utilize a simple Flow to gather some extra data from a http service and enhance my data object with the results. The following illustrates the Idea:

val httpClient = Http().superPool[User]()

val cityRequest = Flow[User].map { user=>
  (HttpRequest(uri=Uri(config.getString("cityRequestEndpoint"))), User)
}

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), user) => user
  case (Success(resp), user) => {
    // << What to do here to get the value >> //
    val responseData = processResponseSomehowToGetAValue?
    val enhancedUser = new EnhancedUser(user.data, responseData)
    enhancedUser
  }
}

val processEnhancedUser = Flow[EnhancedUser].map {
  // e.g.: Asynchronously save user to a database
}

val useEnhancementGraph = userSource
  .via(getRequest)
  .via(httpClient)
  .via(getResponse)
  .via(processEnhancedUser)
  .to(Sink.foreach(println))

I have a problem to understand the mechanics and difference between the streaming nature and materialization / Futures inside the Flow.

Following ideas did not explain it to me:

How do i get the value from the response into the new user object, so i can handle that object in the following steps.

Thanks for help.

Update:

I was evaluating the code with a remote akka http server answering to requests between immediately and 10 seconds using the code below for parsing. This led to the effect that some "EnhancedUser" Instances showed up at the end, but the ones who took too long to answer were missing their values.

I added .async to the end of the cityResponse parser at some time and the result output took longer, but was correct.

What is the reason for that behaviour and how does it fit together with the accepted Answer?

val cityResponse = Flow[(Try[HttpResponse], User)].map {
  case (Failure(ex), member) => member
  case (Success(response), member) => {
    Unmarshal(response.entity).to[String] onComplete {
      case Success(s) =>  member.city = Some(s)
      case Failure(ex) => member.city = None
    }
  }
  member
}.async  // <<-- This changed the behavior to be correct, why?

Solution

  • There are two different strategies you could use depending on the nature of the entity you are getting from "cityRequestEndpoint":

    Stream Based

    The typical way to handle this situation is to always assume that the entity coming from the source endpoint can contain N pieces of data, where N is not known in advance. This is usually the pattern to follow because it is the most generic and therefore "safest" in the real world.

    The first step is to convert the HttpResponse coming from the endpoint into a Source of data:

    val convertResponseToByteStrSource : (Try[HttpResponse], User) => Source[(Option[ByteString], User), _] = 
      (response, user) => response match {
        case Failure(_) => Source single (None -> user)
        case Success(r) => r.entity.dataBytes map (byteStr => Some(byteStr) -> user)
      }
    

    The above code is where we don't assume the size of N, r.entity.dataBytes could be a Source of 0 ByteString values, or potentially an infinite number values. But our logic doesn't care!

    Now we need to combine the data coming from the Source. This is a good use case for Flow.flatMapConcat which takes a Flow of Sources and converts it into a Flow of values (similar to flatMap for Iterables):

    val cityByteStrFlow : Flow[(Try[HttpResponse], User), (Option[ByteString], User), _] = 
      Flow[(Try[HttpResponse], User)] flatMapConcat convertResponseToByteStrSource
    

    All that is left to do is convert the tuples of (ByteString, User) into EnhancedUser. Note: I am assuming below that User is a subclass of EnhancedUser which is inferred from the question logic:

    val convertByteStringToUser : (Option[ByteString], User) => EnhancedUser = 
      (byteStr, user) => 
        byteStr
          .map(s => EnhancedUser(user.data, s))
          .getOrElse(user)
    
    val cityUserFlow : Flow[(Option[ByteString], User), EnhancedUser, _] = 
      Flow[(ByteString, User)] map convertByteStringToUser
    

    These components can now be combined:

    val useEnhancementGraph =
      userSource
        .via(cityRequest)
        .via(httpClient)
        .via(cityByteStrFlow)
        .via(cityUserFlow)
        .via(processEnhancedUser)
        .to(Sink foreach println)
    

    Future Based

    We can use Futures to solve the problem, similar to the stack question you referenced in your original question. I don't recommend this approach for 2 reasons:

    1. It assumes only 1 ByteString is coming from the endpoint. If the endpoint sends multiple values as ByteStrings then they all get concatenated together and you could get an error when creating EnhancedUser.
    2. It places an artificial timeout on the materialization of the ByteString data, similar to Async.await (which should almost always be avoided).

    To use the Future based approach the only big change to your original code is to use Flow.mapAsync instead of Flow.map to handle the fact that a Future is being created in the function:

    val parallelism = 10
    
    val timeout : FiniteDuration = ??? //you need to specify the timeout limit
    
    val convertResponseToFutureByteStr : (Try[HttpResponse], User) => Future[EnhancedUser] = 
      _ match {
        case (Failure(ex), user)   => 
          Future successful user
        case (Success(resp), user) => 
          resp
            .entity
            .toStrict(timeout)
            .map(byteStr => new EnhancedUser(user.data, byteStr))
      }    
    
    val cityResponse : Flow[(Try[HttpResponse], User), EnhancedUser, _] =
      Flow[(Try[HttpResponse], User)].mapAsync(parallelism)(convertResponseToFutureByteStr)