Search code examples
akkaakka-streamakka-http

Akka http streaming the response headers


By definition the http response is split in 3 parts, status-code -> headers -> body, and when doing akka client http requests the http response is received after the first 2 parts have been completely received.

  val responseFuture: Future[HttpResponse]
  responseFuture.map {
    case HttpResponse(statusCode:StatusCode, headers:Seq[HttpHeader], entity:ResponseEntity, protocol:HttpProtocol)
  }

This is completely fine for most use cases, but in my particular case I need access to the headers before all the headers are received (a third party server is returning progress by writing custom progress headers until the response is ready). Is there any way to access the headers the same way we access the body?

  val entity: ResponseEntity
  val entitySource:Source[ByteString, Any] = entity.dataBytes

In the perfect world there would be a way to access the headers as a source as well

HttpResponse(statusCode:StatusCode, headers:Source[HttpHeader, NotUsed], entity:ResponseEntity, protocol:HttpProtocol)

Solution

  • Not Possible With akka-http

    The representation of HttpResponse treats the headers as a Seq[HttpHeader] instead of an Iterator or an akka-stream Source. Therefore, as explained in the question, it is not possible to instantiate an HttpResponse object without having all the header values available first.

    I do not know the exact reason behind this design decision but I suspect it is because it would be difficult to support a Source for the headers and a Source for the body. The body Source would not be able to be consumed without first consuming the header Source, so there would have to be a strict ordering of accessing the response's member variables. This would lead to confusion and unexpected errors.

    Lower Level Processing with akka-stream

    The hypertext transfer protocol is just an application layer protocol, usually on top of TCP. And, it is a fairly simple message format:

    The response message consists of the following:

    • A status line which includes the status code and reason message (e.g., HTTP/1.1 200 OK, which indicates that the client's request succeeded).
    • Response header fields (e.g., Content-Type: text/html).
    • An empty line.
    • An optional message body.

    Therefore, you could use the Tcp binding to get a connection and parse the message ByteString Source yourself to get at the headers:

    val maximumFrameLength = 1024 * 1024
    
    val endOfMessageLine : () => Byte => Boolean = () => {
      var previousWasCarriage = false
    
      (byte) => 
        if(byte == '\r') {
          previousWasCarriage = true
          false
        }
        else if(byte == '\n' && previousWasCarriage) {
          previousWasCarriage = false
          true
        }
        else {
          previousWasCarriage = false
          false
        }
    }
    
    def subFlow = 
      Flow[ByteString].flatMapConcat(str => Source.fromIterable(str))
                      .splitAfter(endOfMessageLine())
    

    Unfortunately this probably requires that your request be sent as a raw ByteString via the Tcp binding as well.