How can we consume SSE in scala play framework? Most of the resources that I could find were to make an SSE source. I want to reliably listen to SSE events from other services (with autoconnect). The most relevant article was https://doc.akka.io/docs/alpakka/current/sse.html . I implemented this but this does not seem to work (code below). Also the event that I am su
@Singleton
class SseConsumer @Inject()((implicit ec: ExecutionContext) {
implicit val system = ActorSystem()
val send: HttpRequest => Future[HttpResponse] = foo
def foo(x:HttpRequest) = {
try {
println("foo")
val authHeader = Authorization(BasicHttpCredentials("user", "pass"))
val newHeaders = x.withHeaders(authHeader)
Http().singleRequest(newHeaders)
}catch {
case e:Exception => {
println("Exception", e.printStackTrace())
throw e
}
}
}
val eventSource: Source[ServerSentEvent, NotUsed] =
EventSource(
uri = Uri("https://abc/v1/events"),
send,
initialLastEventId = Some("2"),
retryDelay = 1.second
)
def orderStatusEventStable() = {
val events: Future[immutable.Seq[ServerSentEvent]] =
eventSource
.throttle(elements = 1, per = 500.milliseconds, maximumBurst = 1, ThrottleMode.Shaping)
.take(10)
.runWith(Sink.seq)
events.map(_.foreach( x => {
println("456")
println(x.data)
}))
}
Future {
blocking{
while(true){
try{
Thread.sleep(2000)
orderStatusEventStable()
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
}
This does not give any exceptions and println("456") is never printed.
EDIT:
Future {
blocking {
while(true){
try{
Await.result(orderStatusEventStable() recover {
case e: Exception => {
println("exception", e)
throw e
}
}, Duration.Inf)
} catch {
case e:Exception => {
println("Exception", e.printStackTrace())
}
}
}
}
}
Added an await and it started working. Able to read 10 messages at a time. But now I am faced with another problem. I have a producer which can at times produce faster than I can consume and with this code I have 2 issues:
The issue in your code is that the events: Future
would only complete when the stream (eventSource
) completes.
I'm not familiar with SSE but the stream likely never completes in your case as it's always listening for new events.
You can learn more in Akka Stream documentation.
Depending on what you want to do with the events, you could just map
on the stream like:
eventSource
...
.map(/* do something */)
.runWith(...)
Basically, you need to work with the Akka Stream Source
as data is going through it but don't wait for its completion.
EDIT: I didn't notice the take(10)
, my answer applies only if the take
was not here. Your code should work after 10 events sent.