Search code examples
spring-bootwebsocketspring-integrationspring-websocketspring-cloud-stream

How to create a custom source app for spring-cloud-stream-app-starters project


I want to create a web-socket source (for spring-cloud-stream-app-starters) which is currently not available on git hub.

I went through some of the available sources but had some confusions, may be because I'm not familiar with the framework.

Can I just create a spring boot application with Source binding and return the received packets from web-socket clients in an @InboundChannelAdapter(value = Source.OUTPUT) annotated method. ? Also how can I use WebSocketInboundChannelAdapter to start a websocket server and push the packets to the underlying broker.?


Solution

  • You can get some ideas in the Reference Manual.

    The WebSocketInboundChannelAdapter is an event-driven channel adapter, it's not pollable source. So, what you need is just a @Bean for this one and an appropriate reference to the Source.OUTPUT.

    The WebSocketInboundChannelAdapter doesn't start server. That is responsibility of the:

    /**
     * The {@link IntegrationWebSocketContainer} implementation for the {@code server}
     * {@link org.springframework.web.socket.WebSocketHandler} registration.
     * <p>
     * Registers an internal {@code IntegrationWebSocketContainer.IntegrationWebSocketHandler}
     * for provided {@link #paths} with the {@link WebSocketHandlerRegistry}.
     * <p>
     * The real registration is based on Spring Web-Socket infrastructure via {@link WebSocketConfigurer}
     * implementation of this class.
     *
     * @author Artem Bilan
     * @author Gary Russell
     * @since 4.1
     */
    public class ServerWebSocketContainer extends IntegrationWebSocketContainer
            implements WebSocketConfigurer, SmartLifecycle {
    

    We have a documentation on the matter as well.

    There is also a stomp-chat sample to demonstrate the server behavior.

    I think you don't need "underlying broker" in this kind of source application: you just receive messages over web socket and publish them to the Source.OUTPUT. Why do you need STOMP broker here?

    UPDATE

    Have just tested this code against Rabbit Binder:

    @SpringBootApplication
    @EnableBinding(Source.class)
    public class CloudStreamWebSocketSourceApplication {
    
        @Bean
        public WebSocketInboundChannelAdapter webSocketInboundChannelAdapter() {
            WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
                    new WebSocketInboundChannelAdapter(serverWebSocketContainer());
            webSocketInboundChannelAdapter.setOutputChannelName(Source.OUTPUT);
            return webSocketInboundChannelAdapter;
        }
    
        @Bean
        public IntegrationWebSocketContainer serverWebSocketContainer() {
            return new ServerWebSocketContainer("/test")
                    .withSockJs()
                    .setAllowedOrigins("*");
        }
    
        public static void main(String[] args) throws IOException {
            SpringApplication.run(CloudStreamWebSocketSourceApplication.class, args);
            System.out.println("Done");
        }
    
    }
    

    My test-case is like:

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    public class CloudStreamWebSocketSourceApplicationTests {
    
        @LocalServerPort
        private String port;
    
        @Test
        public void testWebSocketStreamSource() throws IOException, InterruptedException {
            StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
    
            ClientWebSocketContainer clientWebSocketContainer =
                    new ClientWebSocketContainer(webSocketClient, "ws://localhost:" + this.port + "/test/websocket");
            clientWebSocketContainer.start();
    
            WebSocketSession session = clientWebSocketContainer.getSession(null);
    
            session.sendMessage(new TextMessage("foo"));
    
            Thread.sleep(10000);
        }
    
    }
    

    This is my dependencies:

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-websocket</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>