Search code examples
springspring-integration

sendTimeout appears to be ignored in sourcePollingChannelAdapterSpec vs creating adapters manually


When I create an IntegrationFlow setting the sendTimeout() value on SourcePollingChannelAdapterSpec is being ignored causing the polling thread to block. This does not happen if I create a SourcePollingChannelAdapter programmatically. For example, programmatically with a @Configuration class:

@Configuration
public class JdbcPollingConfig {

    @Bean
    protected MessageChannel jdbcInboundChannel() {
        return new QueueChannel(1);
    }

    @Bean
    public ThreadPoolTaskScheduler jdbcTaskExecutor() {
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.setPoolSize(1);
        taskScheduler.initialize();
        return taskScheduler;
    }

    @Bean
    protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
        final JdbcPollingChannelAdapter adapter =
                new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
        return adapter;
    }

    @Bean
    protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
            final JdbcPollingChannelAdapter adapter,
            final MessageChannel jdbcInboundChannel,
            final ThreadPoolTaskScheduler jdbcTaskExecutor) {
        final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
        spcAdapter.setSource(adapter);
        spcAdapter.setOutputChannel(jdbcInboundChannel);
        spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
        spcAdapter.setTaskScheduler(jdbcTaskExecutor);
        spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
        return spcAdapter;
    }

}

Please Note class CustomJdbcPollingChannelAdapter is my own that extends just so that I could override the doReceive() method with some logging calls:

@Log4j2
public class CustomJdbcPollingChannelAdapter extends JdbcPollingChannelAdapter {
    public CustomJdbcPollingChannelAdapter(DataSource dataSource, String selectQuery) {
        super(dataSource, selectQuery);
    }

    public CustomJdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery) {
        super(jdbcOperations, selectQuery);
    }

    @Override
    protected Object doReceive() {
        log.info("Jdbc Polling.");
        return super.doReceive();
    }
}

With the channel queue as configured, this configuration throws an exception as anticipated because there is no consumer of the channel jdbcInboundChannel. Contrast this using and IntegrationFlow configuration:

@Configuration
public class JdbcDSLConfig {

    @Bean
    public QueueChannelSpec jdbcInboundChannel() {
        return MessageChannels.queue(1);
    }

    @Bean
    public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
        return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
    }

    @Bean
    public ThreadPoolTaskExecutor jdbcExecutor() {
        final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1);
        taskExecutor.setMaxPoolSize(1);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
                                           final QueueChannelSpec jdbcInboundChannel,
                                           @Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
        return IntegrationFlow.from(jdbcMessageSource,
                        c -> c.poller(Pollers
                                .fixedDelay(2000)
                                .sendTimeout(1) // this has no effect.
                                .taskExecutor(jdbcExecutor)))
                .channel(jdbcInboundChannel)
                .get();

    }
}

This configuration blocks on the second polling of the database even with a sendTimeout set in the configuration.


Solution

  • This is a good catch!

    In fact PollerMetadata.getSendTimeout() is never used in the Framework.

    Please, raise a GH issue and we will address it shortly.