Search code examples
scalaakka-streamakka-http

Multiplex akka streams source into two copies


I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse, i.e. I have a Source[ByteString,_] that I want to hand to HttpEntity, but I also want to run the same data into a FileIO.toPath sink.

                       |-> FileIO.toPath
Source[ByteString,_] ->|
                       |-> HttpEntity(contentType, Source[ByteString,_]

It seems that Broadcast is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath is a sink, HttpEntity expects a Source.

There's also Source.fromGraph which looks like it would create a source from a GraphStage, such as a Broadcast stage, but I can't quite figure out how I would get the FileIO sink in there.


Solution

  • You can use alsoTo:

    val originalSource: Source[ByteString, _] = ???
    val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
    val entity = HttpEntity(contentType, cachedSource)