Search code examples
scalaakka-streamserver-sent-eventsplayframework-2.6

Play framework Scala: Create infinite source using scala akka streams and keep Server sent events connection open on server


We have requirement to implements server sent events for following uses cases:

  1. Send notification to UI after some processing on server. This processing is based on some logic
  2. Send notification to UI after reading messages from RabbitMQ followed by performing some operation on it.

Technology set we are using Scala(2.11/2.12) with Play framework(2.6.x). Library: akka.stream.scaladsl.Source

We started our proof of concept with following example https://github.com/playframework/play-scala-streaming-example and then we extended by creating different sources. We tried creating source using Source.apply,soure.single.

But as soon as all elements in source has been pushed to UI, my event streams got closed. But I don't want event stream to close. Also I don't want to use some timer(Source.tick) or Source.repeat.

When my source was created, collection had let's say some x elements and then service added 4 more elements. But after x elements, event stream got closed and then again reopened.

Is there any way my event stream can be infinite and will be closed only my session is logged off or we can explicitly close it.

//Code for KeepAlive(as asked in comments)

   object NotficationUtil {

      var userNotificationMap = Map[Integer, Queue[String]]()

      def addUserNotification(userId: Integer, message: String) = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
        queue += message
        userNotificationMap.put(userId, queue)

      }

      def pushNotification(userId: Integer): Source[JsValue, _] = {
        var queue = userNotificationMap.getOrElse(userId, Queue[String]())
         Source.single(Json.toJson(queue.dequeueAll { x => true }))
      }
    }
    @Singleton
    class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{

      def pushNotifications(user_id:Integer) = Action {
      val stream = NotficationUtil.pushNotification(user_id)
       Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
     }

}

Solution

  • Use below code to create actorref and publisher

    val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()
    

    And create your source from this publisher

    val testsource = Source
          .fromPublisher[T](sourcePublisher)
    

    And register your listener as

    Ok.chunked(
            testsource.keepAlive(
              50.seconds,
              () => Json.obj("data"->"heartbeat")) via EventSource.flow)
          .as(ContentTypes.EVENT_STREAM)
          .withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")
    

    Send your json data to ref actor and data will flow as event stream through this source to front end. Hope it helps.