Search code examples
scalahttpakkaakka-http

Identify Akka HttpRequest and HttpResponse?


While using Akka HttpRequest and pipe the request to an actor, i couldn't identify the response. The actor will handle each message that will receive but it doesn't know which request used to get this response. Is there any way to identify each request to match the response with ?

Note: i don't have the server to resend any part of request body again.

Thanks in advance

MySelf.scala

import akka.actor.{ Actor, ActorLogging }
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.util.ByteString

class Myself extends Actor with ActorLogging {

import akka.pattern.pipe
import context.dispatcher

final implicit val materializer: ActorMaterializer = 
       ActorMaterializer(ActorMaterializerSettings(context.system))

def receive = {
  case HttpResponse(StatusCodes.OK, headers, entity, _) =>
    entity.dataBytes.runFold(ByteString(""))(_ ++ _).foreach { body =>
      log.info("Got response, body: " + body.utf8String)
  }
  case resp @ HttpResponse(code, _, _, _) =>
    log.info("Request failed, response code: " + code)
    resp.discardEntityBytes()
  }

}

Main.scala

import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer

object HttpServerMain extends App {

import akka.pattern.pipe

//  import system.dispatcher
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
// needed for the future flatMap/onComplete in the end
implicit val executionContext = system.dispatcher

val http = Http(system)

val myActor = system.actorOf(Props[MySelf])

http.singleRequest(HttpRequest(uri = "http://akka.io"))
    .pipeTo(myActor)

http.singleRequest(HttpRequest(uri = "http://akka.io/another-request"))
    .pipeTo(myActor)
Thread.sleep(2000)
system.terminate()

Solution

  • You can simply use map to transform the Future and add some kind of ID (usually called correlation ID for such purposes) to it before you pipe it to myActor:

    http.singleRequest(HttpRequest(uri = "http://akka.io"))
        .map(x => (1, x)).pipeTo(myActor)
    

    You'll need to change you pattern match blocks to take a tupple:

    case (id, HttpResponse(StatusCodes.OK, headers, entity, _)) =>
    

    If you can't/don't want to change your pattern match block for some reason you can use same approach, but instead add a unique HTTP header into your completed request (using copy) with something like this (not checked if compiles):

    // make a unique header name that you are sure will not be
    // received from http response:
    val correlationHeader: HttpHeader = ... // mycustomheader
    
    // Basically hack the response to add your header:
    http.singleRequest(HttpRequest(uri = "http://akka.io"))
        .map(x => x.copy(headers = correlationHeader +: headers)).pipeTo(myActor)
    
    // Now you can check your header to see which response that was:
    case HttpResponse(StatusCodes.OK, headers, entity, _) =>
      headers.find(_.is("mycustomheader")).map(_.value).getOrElse("NA")
    

    This is more of a hack though compared to previous option because you are modifying a response.