Search code examples
scalaplayframeworkakkaserver-sent-eventskeep-alive

How to add keepAlive event to SSE channel


I have a ListenerActor which is listening to messages from backend and pushing the messages through channel as SSE Events.

I want to keep my actor alive so that i can stream continuously. How do i add keepAlive to my actor.

P.S: I am not using Akka stream or Akka http.

def filter(inboxId:String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] = Enumeratee.filter[SSEPublisher.ListenerEnvelope] { envelope: SSEPublisher.ListenerEnvelope => envelope.inboxId == inboxId }

def convert: Enumeratee[SSEPublisher.ListenerEnvelope, String] = Enumeratee.map[SSEPublisher.ListenerEnvelope] {
  envelope =>
    Json.toJson(envelope).toString()
}

def connDeathWatch(addr: String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] =
  Enumeratee.onIterateeDone { () => println(addr + " - SSE disconnected")
}

implicit def pair[E]: EventNameExtractor[E] = EventNameExtractor[E] { p =>
  val parsedJson = scala.util.parsing.json.JSON.parseFull(s"$p").get
  val topic = parsedJson.asInstanceOf[Map[String, String]].apply("topic")
  Some(topic)
}

implicit def id[E]: EventIdExtractor[E] = EventIdExtractor[E](p => Some(UUID.randomUUID().toString))

def events(inboxId: String) = InboxResource(inboxId)(AuthScope.Basic)(authUser => Action { implicit request =>
  Ok.feed(content = ncf.sseEnumerator
    &> filter(inboxId)
    &> convert
    &> EventSource()
  ).as("text/event-stream")
})

override def receive: Receive = {
  case Tick =>
    log.info(s"sending re-register tick to event-publisher")
    Topics.all.foreach { a: Topic =>
      log.info(s"$a")
      clusterClient ! ClusterClient.SendToAll(publisherPath, SSEPublisher.AddListener(a, self))
    }

  case ListenerEnvelope(topic, inboxId, itemId, sourceId, message) =>
    log.info(s"Received message from event publisher for topic $topic, for inbox $inboxId, msg : $message")
    channel.push(SSEPublisher.ListenerEnvelope(topic, inboxId, itemId, sourceId, message))
}

Solution

  • You can create a keepAlive protocol at the actor level and use the scheduler to send the keepAlive message to the actor.

    def convert(t: SomeType): Enumeratee[SSEPublisher.ListenerEnvelope, String] = 
    // pattern match on type t
      }