I'm using the Play framework for a web application I built. Play 2.5 uses the Akka Stream API to allow streaming of request/response.
I have an endpoint where an incoming file is streamed directly to Google Drive.
I define a BodyParser
that looks like that:
BodyParser("toDrive") { request =>
Accumulator.source[ByteString].mapFuture { source =>
Future.successful(Right("Done"))
}
}
I use the source (Source[ByteString, _]
) and feed it into a StreamedBody
that I use with the WSClient
provided by Play.
I would like to use the given Source
and use for two different HTTP call with the WSClient
.
I tried the naive approach by passing the same Source
into two different WSClient
call, but it failed. I think the solution to my problem is broadcasting.
I want to take what's coming out of the source to create 2 sources to be used by my WSClient
.
I'm still playing with Source
, Flow
and Sink
. I'm trying to make sense of it all.
Updated solution:
Accumulator[ByteString, Either[Result, String]] {
val s1 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 1"))
}
val s2 = Sink
.asPublisher[ByteString](fanout = false)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
//do what you need with source
Future.successful(Right("result 2"))
}
def combine(val1: Future[Either[Result, String]],
val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
for {
res1 <- val1
res2 <- val2
} yield {
// do something with your result
res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
}
}
Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[ByteString](2))
broadcast ~> sink
broadcast ~> sink2
SinkShape(broadcast.in)
})
}
To give a little explanation (AFAIK). I create 2 sink and combine them behind a single one. The Accumulator.apply
needs 1 Sink[E, Future[A]]
. The BodyParser
forces me to use ByteString
as E
which is the the type
of data that goes in the sink.
So 2 sinks that takes ByteString
in and materialize as a Future[String]
. I convert the Sink
as a Source
because the API I use (WsClient) can take a Source
as a body. This API gives me a Future[HttpResponse]
(for the sake of the solution, I've simplified this to a Future[String]
but you could do whatever you want in there.
Now this is where the akka-streams
API comes into play. I strongly suggest that you look at the documentation to get a better understanding. With that said, here, I used the GraphDSL API to combine my 2 sink behind a single one. Any ByteString
that comes into the exposed sink is sent into the 2 inner sinks.
Note: there is a convenient Sink.combine
function that takes n
streams and combine them behind one. But using this function means loosing the materialized value (in this case, Future[String]
)
The original solution proposed below is not working properly. It is only sending data to one of the source.
The Play Accumulator
can also be created by giving it a Sink
.
I used this approach and this seems to be working so far:
BodyParser("toDrive") { request =>
def sourceToFut(src: Source): Future[T] = ???
Accumulator[ByteString, Either[Result, T]] {
Sink
.asPublisher[ByteString](fanout = true)
.mapMaterializedValue(Source.fromPublisher)
.mapMaterializedValue { source =>
val upload1Fut = sourceToFut(source)
val upload2Fut = sourceToFut(source)
for {
file1 <- upload1Fut
file2 <- upload2Fut
} yield {
(file1, file2)
}
}
}
}
The only effective changes this has compared to my initial approach is that I create the Sink
myself and allow fanout
so I can use the source twice in two different WSClient
call.
What do you think @expert?