Search code examples
javaspringspring-bootkotlinspring-integration-ip

How to handle socket open/close/error events using spring boot integration ip


I'm using spring boot and integration libraries in order to use a custom TCP/IP protocol in my application.

Here the configuration:

@EnableIntegration
@Configuration
class TcpServerConfiguration {
    @Value("\${tcp.server.port}")
    lateinit var port: Integer

    @Bean
    fun inboundGateway(controller: TcpController): IntegrationFlow {
        return IntegrationFlow.from(Tcp.inboundGateway(Tcp.nioServer(port.toInt())
            .leaveOpen(true)
            .soTimeout(TimeUnit.MILLISECONDS.convert(1L, TimeUnit.MINUTES).toInt())
            .soTcpNoDelay(true)
            .directBuffers(true)
            .deserializer(TcpCodecs.lengthHeader4())
            .serializer(TcpCodecs.lengthHeader4()).get()))
            .handle(controller)
            .get()
    }
}

@Component
class TcpController {
    @ServiceActivator
    fun onPacketReceived(data: ByteArray) {
        val readable = String(data)
        println(readable)
    }
}

I can communicate with my server, but I would like to be able to listen on the events propagated from my TCP server. For example, I would like to be able to send a message to an incoming connection, but I can't catch the events, I get errors like this:

o.s.i.i.tcp.connection.TcpNioConnection  : No publisher available to publish TcpConnectionOpenEvent [source=TcpNioConnection:127.0.0.1:50408:4561:672dbfcb-d560-46d2-8edb-603f5557cfc1], [factory=unknown, connectionId=127.0.0.1:50408:4561:672dbfcb-d560-46d2-8edb-603f5557cfc1] **OPENED**

Unfortunely such handler is never reached and the error keeps going..

    @EventListener
    fun onSocketConnect(event: TcpConnectionOpenEvent) {
        println("welcome ${event.connectionId}")
    }

Solution

  • Adding a simple component extending ApplicationEventPublisher solved it. Now all listeners are working as expected

    @Component
    class TcpController: ApplicationEventPublisher {
        @EventListener
        fun onSocketConnect(event: TcpConnectionOpenEvent) {
            println("Opened ${event.connectionId}")
            val con = event.source as TcpNioConnection
            val ip = (con.socketInfo.remoteSocketAddress as InetSocketAddress).address.hostAddress
            
            println("IP: $ip")
        }
    
        @EventListener
        fun onSocketClose(event: TcpConnectionCloseEvent) {
            println("Closed ${event.connectionId}")
        }
    
        @EventListener
        fun onSocketIdle(event: TcpConnectionExceptionEvent) {
            println("Idle ${event.connectionId}")
        }
    

    I hope it helps!