Search code examples
javaspringspring-integrationspring-integration-dsl

How to properly configure a TCP inboundAdapter with QueueChannel and ServiceActivator


I am trying to configure a TCP socket that receives data in the format name,value in distinct messages. Those messages arrive on average every second, sometimes faster or sometimes slower.

I was able to set up a working configuration but I am lacking a basic understanding of what actually is happening in Spring Integration.

My configuration file looks like this:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService,
        @Value("${tcp.socket.server.port}") final int port
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                Tcp.nioServer(port)
                   .deserializer(serializer())
                   .leaveOpen(true)
            )
               .autoStartup(true)
               .outputChannel(queueChannel())
        ).transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata defaultPoller()
    {
        return Pollers.fixedDelay(50, TimeUnit.MILLISECONDS).get();
    }

    @Bean
    public MessageChannel queueChannel()
    {
        return MessageChannels.queue("queue", 50).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

And the CSVProcessingService looks like this (abbreviated):

@Slf4j
@Service
public class CSVProcessingService
{
    @ServiceActivator
    public void process(final String message)
    {
        log.debug("DATA RECEIVED: \n" + message);
        final CsvMapper csvMapper = new CsvMapper();
        final CsvSchema csvSchema = csvMapper.schemaFor(CSVParameter.class);

        if (StringUtils.contains(message, StringUtils.LF))
        {
            processMultiLineInput(message, csvMapper, csvSchema);
        }
        else
        {
            processSingleLineInput(message, csvMapper, csvSchema);
        }
    }
}

My goals for this configuration are the following:

  • receive messages on the configured port
  • withstand a higher load without losing messages
  • deserialize the messages
  • put them into the queue channel
  • (ideally also log errors)
  • the queue channel is polled every 50 ms and the message from the queue channel passed to the ObjectToStringTransformer
  • after the transformer the converted message is passed to the CSVProcessingService for further processing

Did I achieve all those goals correctly or did I make a mistake because I misunderstood Spring Integration? Would it be possible to combine the Poller and the @ServiceActivator somehow?

Futhermore, I have a problem visualizing how my configured IntegrationFlow actually "flows", maybe somebody can help me to better understand this.

EDIT:

I reworked my configuration after Artems comment. It now look like this:

@Configuration
@EnableIntegration
public class TCPSocketServerConfig
{
    @Value("${tcp.socket.server.port}") int port;

    @Bean
    public IntegrationFlow server(
        final CSVProcessingService csvProcessingService
    )
    {
        return IntegrationFlows.from(
            Tcp.inboundAdapter(
                tcpNioServer()
            )
               .autoStartup(true)
               .errorChannel(errorChannel())
        )
         .transform(new ObjectToStringTransformer())
         .handle(csvProcessingService)
         .get();
    }

    @Bean
    public AbstractServerConnectionFactory tcpNioServer()
    {
        return Tcp.nioServer(port)
                  .deserializer(serializer())
                  .leaveOpen(true)
                  .taskExecutor(
                      new ThreadPoolExecutor(0, 20,
                                             30L, TimeUnit.SECONDS,
                                             new SynchronousQueue<>(),
                                             new DefaultThreadFactory("TCP-POOL"))
                  ).get();
    }

    @Bean
    public MessageChannel errorChannel()
    {
        return MessageChannels.direct("errors").get();
    }

    @Bean
    public IntegrationFlow errorHandling()
    {
        return IntegrationFlows.from(errorChannel()).log(LoggingHandler.Level.DEBUG).get();
    }

    @Bean
    public ByteArrayLfSerializer serializer()
    {
        final ByteArrayLfSerializer serializer = new ByteArrayLfSerializer();

        serializer.setMaxMessageSize(10240);

        return serializer;
    }
}

I also removed the @ServiceActivator annotation form the CSVProcessingService#process method.


Solution

  • Not sure what confuses you, but your configuration and logic looks good.

    You may miss the fact that you don't need a QueueChannel in between, since an AbstractConnectionFactory.processNioSelections() is already multi-threaded and it schedules a task to read a message from the socket. So, you only need is to configure an appropriate Executor for Tcp.nioServer(). Although it is an Executors.newCachedThreadPool() by default anyway.

    On the other hand with in-memory QueueChannel you indeed may lose messages because they are already read from the network.

    When you do Java DSL, you should consider to use poller() option on the endpoint. The @Poller will work on the @ServiceActivator if you have inputChannel attribute over there, but using the same in the handle() will override that inputChannel, so your @Poller won't be applied. Don't confuse yourself with mixing Java DSL and annotation configuration!

    Everything else is good in your configuration.