Search code examples
jmsquarkussmallrye

Bridge JMS Message to SSE Endpoint in Quarkus


In Spring project, I used Sinks to emit events into a SSE endpoint, it worked well, check:

https://github.com/hantsy/spring-graphql-sample/blob/master/dgs-subscription-sse/src/main/kotlin/com/example/demo/DataFetchers.kt#L33

But when I tried to Smallrye Mutiny MultiEmitterProcessor to archive the same purpose, it failed.

The example project is https://github.com/hantsy/quarkus-sandbox/tree/master/jms

    MultiEmitterProcessor<Message> emitterProcessor = MultiEmitterProcessor.create();

    void receive() {
        var consumer = jmsContext.createConsumer(helloQueue);
        consumer.setMessageListener(
                msg -> {
                    try {
                        var received = jsonb.fromJson(msg.getBody(String.class), Message.class);
                        LOGGER.log(Level.INFO, "consuming message: {0}", received);
                        emitterProcessor.emit(received);
                    } catch (JMSException e) {
                        throw new RuntimeException(e);
                    }
                }
        );
    }

And in the Resource class,

    @GET
    @Produces(MediaType.SERVER_SENT_EVENTS)
    @RestStreamElementType(MediaType.APPLICATION_JSON)
    public Multi<Message> stream() {
        // see: https://github.com/quarkusio/quarkus/issues/35220
        return handler.emitterProcessor.toMulti().toHotStream();
    }
  1. I am not sure MultiEmitterProcessor is good to work as a hot stream?

  2. or there are something like Reactor ConnectableFlux to connect to a hot stream manually?


Solution

  • Finally, I move the following codes from Resource class

    emitterProcessor.toMulti() 
    
    

    into a method in the handler class, then it works.

    public Multi<Message> stream() {
        return processor.toMulti().broadcast().toAllSubscribers();
    }
    
    

    Check the final working example: https://github.com/hantsy/quarkus-sandbox/tree/master/jms