When I create an IntegrationFlow
setting the sendTimeout()
value on SourcePollingChannelAdapterSpec
is being ignored causing the polling thread to block. This does not happen if I create a SourcePollingChannelAdapter
programmatically. For example, programmatically with a @Configuration
class:
@Configuration
public class JdbcPollingConfig {
@Bean
protected MessageChannel jdbcInboundChannel() {
return new QueueChannel(1);
}
@Bean
public ThreadPoolTaskScheduler jdbcTaskExecutor() {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.setPoolSize(1);
taskScheduler.initialize();
return taskScheduler;
}
@Bean
protected JdbcPollingChannelAdapter jdbcPollingChannelAdapter(final DataSource dataSource) {
final JdbcPollingChannelAdapter adapter =
new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
return adapter;
}
@Bean
protected SourcePollingChannelAdapter sourcePollingChannelAdapter(
final JdbcPollingChannelAdapter adapter,
final MessageChannel jdbcInboundChannel,
final ThreadPoolTaskScheduler jdbcTaskExecutor) {
final SourcePollingChannelAdapter spcAdapter = new SourcePollingChannelAdapter();
spcAdapter.setSource(adapter);
spcAdapter.setOutputChannel(jdbcInboundChannel);
spcAdapter.setSendTimeout(500); // Sets the send-to-channel timeout - throws exception on timeout though.
spcAdapter.setTaskScheduler(jdbcTaskExecutor);
spcAdapter.setTrigger( new PeriodicTrigger(Duration.ofSeconds(2L)));
return spcAdapter;
}
}
Please Note class CustomJdbcPollingChannelAdapter
is my own that extends just so that I could override the doReceive()
method with some logging calls:
@Log4j2
public class CustomJdbcPollingChannelAdapter extends JdbcPollingChannelAdapter {
public CustomJdbcPollingChannelAdapter(DataSource dataSource, String selectQuery) {
super(dataSource, selectQuery);
}
public CustomJdbcPollingChannelAdapter(JdbcOperations jdbcOperations, String selectQuery) {
super(jdbcOperations, selectQuery);
}
@Override
protected Object doReceive() {
log.info("Jdbc Polling.");
return super.doReceive();
}
}
With the channel queue as configured, this configuration throws an exception as anticipated because there is no consumer of the channel jdbcInboundChannel
.
Contrast this using and IntegrationFlow
configuration:
@Configuration
public class JdbcDSLConfig {
@Bean
public QueueChannelSpec jdbcInboundChannel() {
return MessageChannels.queue(1);
}
@Bean
public MessageSource<Object> jdbcMessageSource(final DataSource dataSource) {
return new CustomJdbcPollingChannelAdapter(dataSource, "select dp_id, dp_payload from data_packets");
}
@Bean
public ThreadPoolTaskExecutor jdbcExecutor() {
final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(1);
taskExecutor.setMaxPoolSize(1);
taskExecutor.initialize();
return taskExecutor;
}
@Bean
public IntegrationFlow jdbcInboundFlow(final MessageSource<Object> jdbcMessageSource,
final QueueChannelSpec jdbcInboundChannel,
@Qualifier("jdbcExecutor") final ThreadPoolTaskExecutor jdbcExecutor) {
return IntegrationFlow.from(jdbcMessageSource,
c -> c.poller(Pollers
.fixedDelay(2000)
.sendTimeout(1) // this has no effect.
.taskExecutor(jdbcExecutor)))
.channel(jdbcInboundChannel)
.get();
}
}
This configuration blocks on the second polling of the database even with a sendTimeout set in the configuration.
This is a good catch!
In fact PollerMetadata.getSendTimeout()
is never used in the Framework.
Please, raise a GH issue and we will address it shortly.