Search code examples
server-sent-eventsquarkus

How to forward incoming data via REST to an SSE stream in Quarkus


In my setting I want to forward certain status changes via an SSE channel (Server sent events). The status changes are initiated by calling a REST endpoint. So, I need to forward the incoming status change to the SSE stream.

What is the best/simplest way to accomplish this in Quarkus.

One solution I can think of is to use an EventBus (https://quarkus.io/guides/reactive-messaging). The SSE endpoint would subscribe to the status changes and push it through the SSE channel. The status change endpoint publishes appropriate events.

Is this a viable solution? Are there other (simpler) solutions? Do I need to use the reactive stuff in any case to accomplish this?

Any help is very appreciated!


Solution

  • Dmytro, thanks for pointing me in the right direction. I have opted for Mutiny in connection with Kotlin. My code now looks like this:

    data class DeviceStatus(var status: Status = Status.OFFLINE) {
        enum class Status {OFFLINE, CONNECTED, ANALYZING, MAINTENANCE}
    }
    
    @ApplicationScoped
    class DeviceStatusService {
        var deviceStatusProcessor: PublishProcessor<DeviceStatus> = PublishProcessor.create()
        var deviceStatusQueue: Flowable<DeviceStatus> = Flowable.fromPublisher(deviceStatusProcessor)
    
        fun pushDeviceStatus(deviceStatus: DeviceStatus) {
            deviceStatusProcessor.onNext(deviceStatus)
        }
    
        fun getStream(): Multi<DeviceStatus> {
            return Multi.createFrom().publisher(deviceStatusQueue)
        }
    }
    
    @Path("/deviceStatus")
    class DeviceStatusResource {
        private val LOGGER: Logger = Logger.getLogger("DeviceStatusResource")
    
        @Inject
        @field: Default
        lateinit var deviceStatusService: DeviceStatusService
    
        @POST
        @Consumes(MediaType.APPLICATION_JSON)
        fun status(status: DeviceStatus): Response {
            LOGGER.info("POST /deviceStatus " + status.status);
            deviceStatusService.pushDeviceStatus(status)
            return Response.ok().build();
        }
    
        @GET
        @Path("/eventStream")
        @Produces(MediaType.SERVER_SENT_EVENTS)
        @SseElementType(MediaType.APPLICATION_JSON)
        fun stream(): Multi<DeviceStatus>? {
            return deviceStatusService.getStream()
        }
    }
    

    As minimal setup the service could directly use the deviceStatusProcessor as publisher. However, the Flowable adds buffering. Comments on the implementation are welcome.