Search code examples
springspring-bootrabbitmqstomp

Cannot connect to rabbitmq STOMP from Spring boot


I have used the RabbitMQ docker image which has STOMP enabled. With the following configuration, when I try to run my Spring Boot Application, I am getting an exception.

StackTrace:

2020-11-21 16:03:07.620 INFO 28504 --- [ient-loop-nio-1] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection failure in session system: Failed to connect: Connection refused: /127.0.0.1:61613

io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /127.0.0.1:61613 Caused by: java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_242] at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:714) ~[na:1.8.0_242] at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final] at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]

Dockerfile

FROM rabbitmq:3-management
RUN rabbitmq-plugins enable --offline rabbitmq_stomp
EXPOSE 61613

The logs from Rabbitmq container looks fine to me.

enter image description here

WebSocketConfig.java looks like:

@EnableWebSocketMessageBroker
@Configuration
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws-connection")
            .setAllowedOrigins("*")
            .withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/topic", "/queue")
            .setRelayPort(61613)
            .setRelayHost("127.0.0.1")
            .setClientPasscode("guest")
            .setClientLogin("guest");
    registry.setApplicationDestinationPrefixes("/ws");
}
}

pom.xml

    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

What's wrong with the configuration? Can anyone help me?


Solution

  • I think you made a mistake while exposing the rabbitmq stomp port 61613 for the client. By the way, I tested with a similar configuration it works for me.

    For implementation please check my demo application on GitHub or read the following details.

    Dockerfile

    FROM rabbitmq:3-management
    
    RUN rabbitmq-plugins enable --offline rabbitmq_stomp
    
    EXPOSE 15671 15672 61613
    

    Server Implementation

    Message Contract

    public class ZbytesMessage {
    
        private String from;
        private String text;
    
        ...getters and setters...
    
    }
    

    WebSocket Configuration

    @Configuration
    @EnableWebSocketMessageBroker
    public class StompConfig implements WebSocketMessageBrokerConfigurer {
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            registry.addEndpoint("/zsockets")
                    .setAllowedOrigins("*").withSockJS();
        }
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry config) {
            config.enableStompBrokerRelay("/topic", "/queue")
                    .setRelayHost("localhost")
                    .setRelayPort(61613)
                    .setClientLogin("guest")
                    .setClientPasscode("guest");
            config.setApplicationDestinationPrefixes("/zbytes");
        }
    }
    

    Web Controller

    @Controller
    public class ZbytesController {
    
        private static final Logger LOG = LoggerFactory.getLogger(ZbytesController.class);
    
        @MessageMapping("/hello")
        @SendTo("/topic/greetings")
        public ZbytesMessage greeting(ZbytesMessage msg) throws Exception {
            Thread.sleep(1000); // simulated delay
            LOG.info("Received : {} from: {} ", msg.getText(), msg.getFrom());
            return msg;
        }
    }
    

    Server Runner

    @SpringBootApplication
    public class ServerRunner {
    
        public static void main(String[] args) {
            SpringApplication.run(ServerRunner.class, args);
        }
    
    }
    

    Client Implementation

    public class HelloClient {
    
        private static final WebSocketHttpHeaders headers = new WebSocketHttpHeaders();
        private static final Logger LOG = LoggerFactory.getLogger(HelloClient.class);
    
        public static void main(String[] args) throws Exception {
            HelloClient helloClient = new HelloClient();
    
            ListenableFuture<StompSession> f = helloClient.connect();
            StompSession stompSession = f.get();
    
            LOG.info("Subscribing to greeting topic using session {}", stompSession);
            helloClient.subscribeGreetings(stompSession);
    
            LOG.info("Sending hello message {}", stompSession);
            helloClient.sendHello(stompSession);
            Thread.sleep(60000);
        }
    
        public ListenableFuture<StompSession> connect() {
    
            Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
            List<Transport> transports = Collections.singletonList(webSocketTransport);
    
            SockJsClient sockJsClient = new SockJsClient(transports);
            sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
    
            WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
    
            String url = "ws://{host}:{port}/zsockets";
            return stompClient.connect(url, headers, new MyHandler(), "localhost", 8080);
        }
    
        public void subscribeGreetings(StompSession stompSession) {
            stompSession.subscribe("/topic/greetings", new StompFrameHandler() {
    
                public Type getPayloadType(StompHeaders stompHeaders) {
                    return byte[].class;
                }
    
                public void handleFrame(StompHeaders stompHeaders, Object o) {
                    LOG.info("Received greeting {}", new String((byte[]) o));
                }
            });
        }
    
        public void sendHello(StompSession stompSession) {
            String jsonHello = "{ \"from\" : \"suraj\", \"text\" : \"Hi zbytes!\" }";
            stompSession.send("/zbytes/hello", jsonHello.getBytes());
        }
    
        private static class MyHandler extends StompSessionHandlerAdapter {
            @Override
            public void afterConnected(StompSession stompSession, StompHeaders stompHeaders) {
                LOG.info("Now connected");
            }
        }
    
    }
    

    To Run

    • Build the docker image and run it (don't forget to expose port 61613). (Note: I would prefer docker-compose.yaml)
    docker build -t zbytes/rabbitmq .
    docker run -p61613:61613 zbytes/rabbitmq
    
    • Run ServerRunner java main class.
    • Run HelloClient java main class.

    Server Output

    i.g.zbytes.demo.server.ZbytesController  : Received : Hi zbytes! from: suraj 
    

    Client Output

    Received greeting {"from":"suraj","text":"Hi zbytes!"}