We have requirement to implements server sent events for following uses cases:
Technology set we are using Scala(2.11/2.12) with Play framework(2.6.x). Library: akka.stream.scaladsl.Source
We started our proof of concept with following example https://github.com/playframework/play-scala-streaming-example and then we extended by creating different sources. We tried creating source using Source.apply,soure.single.
But as soon as all elements in source has been pushed to UI, my event streams got closed. But I don't want event stream to close. Also I don't want to use some timer(Source.tick) or Source.repeat.
When my source was created, collection had let's say some x elements and then service added 4 more elements. But after x elements, event stream got closed and then again reopened.
Is there any way my event stream can be infinite and will be closed only my session is logged off or we can explicitly close it.
//Code for KeepAlive(as asked in comments)
object NotficationUtil {
var userNotificationMap = Map[Integer, Queue[String]]()
def addUserNotification(userId: Integer, message: String) = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
queue += message
userNotificationMap.put(userId, queue)
}
def pushNotification(userId: Integer): Source[JsValue, _] = {
var queue = userNotificationMap.getOrElse(userId, Queue[String]())
Source.single(Json.toJson(queue.dequeueAll { x => true }))
}
}
@Singleton
class EventSourceController @Inject() (cc: ControllerComponents) extends AbstractController(cc) with FlowFactory{
def pushNotifications(user_id:Integer) = Action {
val stream = NotficationUtil.pushNotification(user_id)
Ok.chunked(stream.keepAlive(50.second, ()=>Json.obj("data"->"heartbeat")) via EventSource.flow).as(ContentTypes.EVENT_STREAM)
}
}
Use below code to create actorref and publisher
val (ref, sourcePublisher)= Source.actorRef[T](Int.MaxValue, OverflowStrategy.fail).toMat(Sink.asPublisher(true))(Keep.both).run()
And create your source from this publisher
val testsource = Source
.fromPublisher[T](sourcePublisher)
And register your listener as
Ok.chunked(
testsource.keepAlive(
50.seconds,
() => Json.obj("data"->"heartbeat")) via EventSource.flow)
.as(ContentTypes.EVENT_STREAM)
.withHeaders("X-Accel-Buffering" -> "no", "Cache-Control" -> "no-cache")
Send your json data to ref actor and data will flow as event stream through this source to front end. Hope it helps.