I want to create a web-socket source (for spring-cloud-stream-app-starters) which is currently not available on git hub.
I went through some of the available sources but had some confusions, may be because I'm not familiar with the framework.
Can I just create a spring boot application with Source binding
and return the received packets from web-socket clients in an @InboundChannelAdapter(value = Source.OUTPUT)
annotated method. ?
Also how can I use WebSocketInboundChannelAdapter to start a websocket server and push the packets to the underlying broker.?
You can get some ideas in the Reference Manual.
The WebSocketInboundChannelAdapter
is an event-driven channel adapter, it's not pollable source. So, what you need is just a @Bean
for this one and an appropriate reference to the Source.OUTPUT
.
The WebSocketInboundChannelAdapter
doesn't start server. That is responsibility of the:
/**
* The {@link IntegrationWebSocketContainer} implementation for the {@code server}
* {@link org.springframework.web.socket.WebSocketHandler} registration.
* <p>
* Registers an internal {@code IntegrationWebSocketContainer.IntegrationWebSocketHandler}
* for provided {@link #paths} with the {@link WebSocketHandlerRegistry}.
* <p>
* The real registration is based on Spring Web-Socket infrastructure via {@link WebSocketConfigurer}
* implementation of this class.
*
* @author Artem Bilan
* @author Gary Russell
* @since 4.1
*/
public class ServerWebSocketContainer extends IntegrationWebSocketContainer
implements WebSocketConfigurer, SmartLifecycle {
We have a documentation on the matter as well.
There is also a stomp-chat sample to demonstrate the server behavior.
I think you don't need "underlying broker" in this kind of source
application: you just receive messages over web socket and publish them to the Source.OUTPUT
. Why do you need STOMP broker here?
UPDATE
Have just tested this code against Rabbit Binder:
@SpringBootApplication
@EnableBinding(Source.class)
public class CloudStreamWebSocketSourceApplication {
@Bean
public WebSocketInboundChannelAdapter webSocketInboundChannelAdapter() {
WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
new WebSocketInboundChannelAdapter(serverWebSocketContainer());
webSocketInboundChannelAdapter.setOutputChannelName(Source.OUTPUT);
return webSocketInboundChannelAdapter;
}
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
return new ServerWebSocketContainer("/test")
.withSockJs()
.setAllowedOrigins("*");
}
public static void main(String[] args) throws IOException {
SpringApplication.run(CloudStreamWebSocketSourceApplication.class, args);
System.out.println("Done");
}
}
My test-case is like:
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class CloudStreamWebSocketSourceApplicationTests {
@LocalServerPort
private String port;
@Test
public void testWebSocketStreamSource() throws IOException, InterruptedException {
StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
ClientWebSocketContainer clientWebSocketContainer =
new ClientWebSocketContainer(webSocketClient, "ws://localhost:" + this.port + "/test/websocket");
clientWebSocketContainer.start();
WebSocketSession session = clientWebSocketContainer.getSession(null);
session.sendMessage(new TextMessage("foo"));
Thread.sleep(10000);
}
}
This is my dependencies:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>