Search code examples
akka-stream

Akka-Stream stream within stream


I am trying to figure out how to handle a situation where in one of your stage you need to make a call that return an InputStream, where I would deal with that stream as a Source of the stage that comes further down.

e.g.

 Source.map(e => Calls that return an InputStream)
 .via(processingFlow).runwith(sink.ignore)

I would like that the element going to Processing flow as those coming from the InputStream. This is basically a situation where I am tailing a file, reading each line, the line give me the information about a call I need to make against a CLI api, when making that call I get the Stdout as an InputStream from which to read the result. Result are going to be huge most of the time, so I can just collect the all thing in memory.


Solution

    • you can use StreamConverters utilities to get Sources and Sinks from java.io streams. More info here.
    • you can use flatMapConcat or flatMapMerge to flatten a stream of Sources into a single stream. More info here.

    A quick example could be:

      val source: Source[String, NotUsed] = ???
      def gimmeInputStream(name: String): InputStream = ???
      val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
    
      source
        .map(gimmeInputStream)
        .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192))
        .via(processingFlow)
        .runWith(Sink.ignore)
    

    However Akka Streams offers a more idiomatic DSL to read/write files in the FileIO object. More info here.

    The example becomes:

      val source: Source[String, NotUsed] = ???
      val processingFlow: Flow[ByteString, ByteString, NotUsed] = ???
    
      source
        .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name)))
        .via(processingFlow)
        .runWith(Sink.ignore)