Search code examples
playframeworkstreamingakka-stream

Use source twice with akka-stream


I'm using the Play framework for a web application I built. Play 2.5 uses the Akka Stream API to allow streaming of request/response.

I have an endpoint where an incoming file is streamed directly to Google Drive.

I define a BodyParser that looks like that:

BodyParser("toDrive") { request =>
  Accumulator.source[ByteString].mapFuture { source =>
    Future.successful(Right("Done"))
  }
}

I use the source (Source[ByteString, _]) and feed it into a StreamedBody that I use with the WSClient provided by Play.

I would like to use the given Source and use for two different HTTP call with the WSClient.

I tried the naive approach by passing the same Source into two different WSClient call, but it failed. I think the solution to my problem is broadcasting.

I want to take what's coming out of the source to create 2 sources to be used by my WSClient.

I'm still playing with Source, Flow and Sink. I'm trying to make sense of it all.


Solution

  • Updated solution:

      Accumulator[ByteString, Either[Result, String]] {
        val s1 = Sink
          .asPublisher[ByteString](fanout = false)
          .mapMaterializedValue(Source.fromPublisher)
          .mapMaterializedValue { source =>
            //do what you need with source
            Future.successful(Right("result 1"))
          }
        val s2 = Sink
          .asPublisher[ByteString](fanout = false)
          .mapMaterializedValue(Source.fromPublisher)
          .mapMaterializedValue { source =>
            //do what you need with source
            Future.successful(Right("result 2"))
          }
    
        def combine(val1: Future[Either[Result, String]],
                    val2: Future[Either[Result, String]]): Future[Either[Result, String]] = {
          for {
            res1 <- val1
            res2 <- val2
          } yield {
            // do something with your result
            res1.right.flatMap(val1 => res2.right.map(val2 => val1 + val2))
          }
        }
    
        Sink.fromGraph(GraphDSL.create(s1, s2)(combine) { implicit b => (sink, sink2) =>
          import GraphDSL.Implicits._
    
          val broadcast = b.add(Broadcast[ByteString](2))
    
          broadcast ~> sink
          broadcast ~> sink2
    
          SinkShape(broadcast.in)
        })
      }
    

    To give a little explanation (AFAIK). I create 2 sink and combine them behind a single one. The Accumulator.apply needs 1 Sink[E, Future[A]]. The BodyParser forces me to use ByteString as E which is the the type of data that goes in the sink.

    So 2 sinks that takes ByteString in and materialize as a Future[String]. I convert the Sink as a Source because the API I use (WsClient) can take a Source as a body. This API gives me a Future[HttpResponse] (for the sake of the solution, I've simplified this to a Future[String] but you could do whatever you want in there.

    Now this is where the akka-streams API comes into play. I strongly suggest that you look at the documentation to get a better understanding. With that said, here, I used the GraphDSL API to combine my 2 sink behind a single one. Any ByteString that comes into the exposed sink is sent into the 2 inner sinks.

    Note: there is a convenient Sink.combine function that takes n streams and combine them behind one. But using this function means loosing the materialized value (in this case, Future[String])

    The original solution proposed below is not working properly. It is only sending data to one of the source.

    The Play Accumulator can also be created by giving it a Sink.

    I used this approach and this seems to be working so far:

    BodyParser("toDrive") { request =>
      def sourceToFut(src: Source): Future[T] = ???
    
      Accumulator[ByteString, Either[Result, T]] {
        Sink
          .asPublisher[ByteString](fanout = true)
          .mapMaterializedValue(Source.fromPublisher)
          .mapMaterializedValue { source =>
            val upload1Fut = sourceToFut(source)
            val upload2Fut = sourceToFut(source)
            for {
              file1 <- upload1Fut
              file2 <- upload2Fut
            } yield {
              (file1, file2)
            }
          }
      }
    } 
    

    The only effective changes this has compared to my initial approach is that I create the Sink myself and allow fanout so I can use the source twice in two different WSClient call.

    What do you think @expert?