I have a ListenerActor which is listening to messages from backend and pushing the messages through channel as SSE Events.
I want to keep my actor alive so that i can stream continuously. How do i add keepAlive to my actor.
P.S: I am not using Akka stream or Akka http.
def filter(inboxId:String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] = Enumeratee.filter[SSEPublisher.ListenerEnvelope] { envelope: SSEPublisher.ListenerEnvelope => envelope.inboxId == inboxId }
def convert: Enumeratee[SSEPublisher.ListenerEnvelope, String] = Enumeratee.map[SSEPublisher.ListenerEnvelope] {
envelope =>
Json.toJson(envelope).toString()
}
def connDeathWatch(addr: String): Enumeratee[SSEPublisher.ListenerEnvelope, SSEPublisher.ListenerEnvelope] =
Enumeratee.onIterateeDone { () => println(addr + " - SSE disconnected")
}
implicit def pair[E]: EventNameExtractor[E] = EventNameExtractor[E] { p =>
val parsedJson = scala.util.parsing.json.JSON.parseFull(s"$p").get
val topic = parsedJson.asInstanceOf[Map[String, String]].apply("topic")
Some(topic)
}
implicit def id[E]: EventIdExtractor[E] = EventIdExtractor[E](p => Some(UUID.randomUUID().toString))
def events(inboxId: String) = InboxResource(inboxId)(AuthScope.Basic)(authUser => Action { implicit request =>
Ok.feed(content = ncf.sseEnumerator
&> filter(inboxId)
&> convert
&> EventSource()
).as("text/event-stream")
})
override def receive: Receive = {
case Tick =>
log.info(s"sending re-register tick to event-publisher")
Topics.all.foreach { a: Topic =>
log.info(s"$a")
clusterClient ! ClusterClient.SendToAll(publisherPath, SSEPublisher.AddListener(a, self))
}
case ListenerEnvelope(topic, inboxId, itemId, sourceId, message) =>
log.info(s"Received message from event publisher for topic $topic, for inbox $inboxId, msg : $message")
channel.push(SSEPublisher.ListenerEnvelope(topic, inboxId, itemId, sourceId, message))
}
You can create a keepAlive protocol at the actor level and use the scheduler to send the keepAlive message to the actor.
def convert(t: SomeType): Enumeratee[SSEPublisher.ListenerEnvelope, String] =
// pattern match on type t
}