I'm trying to cache a streaming source to disk while I'm also sending it out as an HttpResponse
, i.e. I have a Source[ByteString,_]
that I want to hand to HttpEntity
, but I also want to run the same data into a FileIO.toPath
sink.
|-> FileIO.toPath
Source[ByteString,_] ->|
|-> HttpEntity(contentType, Source[ByteString,_]
It seems that Broadcast
is what I should use for fan-out but from the description, it writes to two sinks and while FileIO.toPath
is a sink, HttpEntity
expects a Source
.
There's also Source.fromGraph
which looks like it would create a source from a GraphStage, such as a Broadcast
stage, but I can't quite figure out how I would get the FileIO
sink in there.
You can use alsoTo
:
val originalSource: Source[ByteString, _] = ???
val cachedSource: Source[ByteString, _] = originalSource.alsoTo(FileIO.toPath(/*...*/))
val entity = HttpEntity(contentType, cachedSource)