Search code examples
pythonscalabottlehttp-streamingakka-stream

Scala read continuous http stream


How can I connect to and read a continuous (chunked) http stream in scala? For example, if I have this simple service written in python/bottle:

from gevent import monkey; monkey.patch_all()

import gevent
from bottle import route, run

@route('/stream')
def stream():
    while True:
        yield 'blah\n'
        gevent.sleep(1)

run(host='0.0.0.0', port=8100, server='gevent')

I'm planning to use akka-stream to process the data, I just need a way to retrieve it.


Solution

  • This should work. Basically, you do a single request to an uri that produces a chunked response. The response entity contains a dataBytes stream. In case of a chunked response, this will be the stream of chunks. In case of a non-chunked response (HttpEntity.Strict), this will be a stream with just a single chunk.

    Obviously you can also explicitly match on the entity to see if it is HttpEntity.Chunked, but usually you want to retain the ability to handle non-chunked responses as well.

    In a real world application you would not use runForeach to execute a side effect, but do some processing with the dataBytes stream.

    import akka.actor.ActorSystem
    import akka.http.scaladsl.Http
    import akka.http.scaladsl.model.{Uri, HttpRequest}
    import akka.stream.ActorMaterializer
    
    object ChunkTestClient extends App {
    
      implicit val system = ActorSystem("test")
      import system.dispatcher
    
      implicit val materializer = ActorMaterializer()
      val source = Uri("https://jigsaw.w3.org/HTTP/ChunkedScript")
      val finished = Http().singleRequest(HttpRequest(uri = source)).flatMap { response =>
        response.entity.dataBytes.runForeach { chunk =>
          println(chunk.utf8String)
        }
      }
    }