Search code examples
scalaplayframeworkakkaakka-stream

How to use an Akka Streams SourceQueue with PlayFramework


I would like to use a SourceQueue to push elements dynamically into an Akka Stream source. Play controller needs a Source to be able to stream a result using the chuncked method.
As Play uses its own Akka Stream Sink under the hood, I can't materialize the source queue myself using a Sink because the source would be consumed before it's used by the chunked method (except if I use the following hack).

I'm able to make it work if I pre-materialize the source queue using a reactive-streams publisher, but it's a kind of 'dirty hack' :

def sourceQueueAction = Action{

    val (queue, pub) = Source.queue[String](10, OverflowStrategy.fail).toMat(Sink.asPublisher(false))(Keep.both).run()

    //stupid example to push elements dynamically
    val tick = Source.tick(0 second, 1 second, "tick")
    tick.runForeach(t => queue.offer(t))

    Ok.chunked(Source.fromPublisher(pub))
  }

Is there a simpler way to use an Akka Streams SourceQueue with PlayFramework?

Thanks


Solution

  • The solution is to use mapMaterializedValue on the source to get a future of its queue materialization :

    def sourceQueueAction = Action {
        val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
    
        futureQueue.map { queue =>
          Source.tick(0.second, 1.second, "tick")
            .runForeach (t => queue.offer(t))
        }
        Ok.chunked(queueSource)
    
      }
    
      //T is the source type, here String
      //M is the materialization type, here a SourceQueue[String]
      def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
        val p = Promise[M]
        val s = src.mapMaterializedValue { m =>
          p.trySuccess(m)
          m
        }
        (s, p.future)
      }