Search code examples
javaspringspring-integrationspring-websocket

Dynamic registration of websocket output adapter is not working


I am trying to implement dynamic registration of ServerWebSocketContainer so that i can publish messages to different websocket endpoints at runtime.

Here is server side code

@Service
public class WebSocketPublisherService {

    @Autowired
    IntegrationFlowContext integrationFlowContext;

    @Bean
    public HandshakeHandler handshakeHandler() {
        return new DefaultHandshakeHandler();
    }

    @Autowired
    RegularPublisher regularPublisher;

    public void startPublishing(String name) {
        ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer(name)
                .setHandshakeHandler(handshakeHandler());

        MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
        methodInvokingMessageSource.setObject(regularPublisher);
        methodInvokingMessageSource.setMethodName("publishEmergency");

        WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
        webSocketOutboundMessageHandler.afterPropertiesSet();

        StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10000)))
                .split(new CustomMesssageSplitter(serverWebSocketContainer))
                .handle(webSocketOutboundMessageHandler)
                .get();

        integrationFlowContext.registration(standardIntegrationFlow)
                .addBean(serverWebSocketContainer)
                .register();

        standardIntegrationFlow.start();

    }
}


@Component
public class RegularPublisher {

    public String publishEmergency() {
        System.out.println("publishing message");
        return "This is Message from Presidenet!";
    }
}

And here is my client code. I am using okhttp 3.5.0 from com.squareup.okhttp3 groupId

OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
StringListner listener = new StringListner();
WebSocket ws = client.newWebSocket(request, listener);

// Trigger shutdown of the dispatcher's executor so this process can
// exit cleanly.
client.dispatcher().executorService().shutdown();

And here is listner code

public class StringListner extends WebSocketListener {

    private static final int NORMAL_CLOSURE_STATUS = 1000;

    @Override
    public void onOpen(WebSocket webSocket, Response response) {
        System.out.println("Connection opened");
    }

    @Override
    public void onMessage(WebSocket webSocket, String text) {
        System.out.println("Receiving: " + text);
    }

    @Override
    public void onMessage(WebSocket webSocket, ByteString bytes) {
        System.out.println("Receiving: " + bytes.hex());
    }

    @Override
    public void onClosing(WebSocket webSocket, int code, String reason) {
        webSocket.close(NORMAL_CLOSURE_STATUS, null);
        System.out.println("Closing: " + code + " " + reason);
    }

    @Override
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {
        t.printStackTrace();
    }
}

Dynamic registered topic is : /examples

Client side passed URL: ws://localhost:8080/examples

I am getting this error while connecting from client to websocket.

java.net.ProtocolException: Expected HTTP 101 response but was '404 '
    at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:215)
    at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:182)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
    at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)


Solution

  • The runtime server socket endpoints registration is available only starting with Spring Integration 5.5 version: https://docs.spring.io/spring-integration/docs/current/reference/html/whats-new.html#x5.5-websocket.

    The test-case on the matter looks like: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-websocket/src/test/java/org/springframework/integration/websocket/dsl/WebSocketDslTests.java#L66

    UPDATE

    There is indeed a problem in the framework, when we have not only plain Spring Integration, but whole Spring Boot. I'm going to fix it somehow on my side, but for now here is a workaround:

    1. Use only a @SpringBootApplication. It brings for us @EnableIntegration.

    2. Don't use @EnableWebSocket since that one is going to override whatever Spring Integration would like to do. In other words @EnableWebSocket is not compatible with Spring Integration Websocket support. (Or wise versa)

    3. You must this bean into your application context:

       @Bean
       public static BeanPostProcessor integrationDynamicWebSocketHandlerMappingWorkaround() {
           return new BeanPostProcessor() {
      
               @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
                   if (bean.getClass().getSimpleName().equals("IntegrationDynamicWebSocketHandlerMapping")) {
                       ((AbstractHandlerMapping) bean).setOrder(0);
                   }
                   return bean;
               }
           };
       }
      

    The problem is that IntegrationDynamicWebSocketHandler comes with a default order and it is added already after default Spring Boot's SimpleUrlHandlerMapping which fails fast when no mapping for requested URL.