I am trying to implement dynamic registration of ServerWebSocketContainer so that i can publish messages to different websocket endpoints at runtime.
Here is server side code
@Service
public class WebSocketPublisherService {
@Autowired
IntegrationFlowContext integrationFlowContext;
@Bean
public HandshakeHandler handshakeHandler() {
return new DefaultHandshakeHandler();
}
@Autowired
RegularPublisher regularPublisher;
public void startPublishing(String name) {
ServerWebSocketContainer serverWebSocketContainer = new ServerWebSocketContainer(name)
.setHandshakeHandler(handshakeHandler());
MethodInvokingMessageSource methodInvokingMessageSource = new MethodInvokingMessageSource();
methodInvokingMessageSource.setObject(regularPublisher);
methodInvokingMessageSource.setMethodName("publishEmergency");
WebSocketOutboundMessageHandler webSocketOutboundMessageHandler = new WebSocketOutboundMessageHandler(serverWebSocketContainer);
webSocketOutboundMessageHandler.afterPropertiesSet();
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(methodInvokingMessageSource, polling -> polling.poller(pollerFactory -> pollerFactory.fixedRate(10000)))
.split(new CustomMesssageSplitter(serverWebSocketContainer))
.handle(webSocketOutboundMessageHandler)
.get();
integrationFlowContext.registration(standardIntegrationFlow)
.addBean(serverWebSocketContainer)
.register();
standardIntegrationFlow.start();
}
}
@Component
public class RegularPublisher {
public String publishEmergency() {
System.out.println("publishing message");
return "This is Message from Presidenet!";
}
}
And here is my client code. I am using okhttp 3.5.0 from com.squareup.okhttp3 groupId
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
StringListner listener = new StringListner();
WebSocket ws = client.newWebSocket(request, listener);
// Trigger shutdown of the dispatcher's executor so this process can
// exit cleanly.
client.dispatcher().executorService().shutdown();
And here is listner code
public class StringListner extends WebSocketListener {
private static final int NORMAL_CLOSURE_STATUS = 1000;
@Override
public void onOpen(WebSocket webSocket, Response response) {
System.out.println("Connection opened");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
System.out.println("Receiving: " + text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
System.out.println("Receiving: " + bytes.hex());
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(NORMAL_CLOSURE_STATUS, null);
System.out.println("Closing: " + code + " " + reason);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
t.printStackTrace();
}
}
Dynamic registered topic is : /examples
Client side passed URL: ws://localhost:8080/examples
I am getting this error while connecting from client to websocket.
java.net.ProtocolException: Expected HTTP 101 response but was '404 '
at okhttp3.internal.ws.RealWebSocket.checkResponse(RealWebSocket.java:215)
at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:182)
at okhttp3.RealCall$AsyncCall.execute(RealCall.java:135)
at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
The runtime server socket endpoints registration is available only starting with Spring Integration 5.5
version: https://docs.spring.io/spring-integration/docs/current/reference/html/whats-new.html#x5.5-websocket.
The test-case on the matter looks like: https://github.com/spring-projects/spring-integration/blob/main/spring-integration-websocket/src/test/java/org/springframework/integration/websocket/dsl/WebSocketDslTests.java#L66
UPDATE
There is indeed a problem in the framework, when we have not only plain Spring Integration, but whole Spring Boot. I'm going to fix it somehow on my side, but for now here is a workaround:
Use only a @SpringBootApplication
. It brings for us @EnableIntegration
.
Don't use @EnableWebSocket
since that one is going to override whatever Spring Integration would like to do. In other words @EnableWebSocket
is not compatible with Spring Integration Websocket support. (Or wise versa)
You must this bean into your application context:
@Bean
public static BeanPostProcessor integrationDynamicWebSocketHandlerMappingWorkaround() {
return new BeanPostProcessor() {
@Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().getSimpleName().equals("IntegrationDynamicWebSocketHandlerMapping")) {
((AbstractHandlerMapping) bean).setOrder(0);
}
return bean;
}
};
}
The problem is that IntegrationDynamicWebSocketHandler
comes with a default order
and it is added already after default Spring Boot's SimpleUrlHandlerMapping
which fails fast when no mapping for requested URL.