I swear I had this working, but when I can back to it after a few months (and an upgrade to Boot 1.5.9), I am having issues.
I set up a JdbcPollingChannelAdapter that I can do a receive() on just fine, but when I put the adapter in a flow that does nothing more than queue the result of the adapter, running .receive on the queue always returns a null (I can see in the console log that the adapter's SQL getting executed, though).
Tests below. Why can I get results from the adapter, but not queue the results? Thank you in advance for any assistance.
public class JdbcpollingchanneladapterdemoTests {
DataSource dataSource;
private static PollableChannel outputQueue;
public static void setupClass() {
outputQueue = MessageChannels.queue().get();
public void Should_HaveQueue() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_Not_HaveMessageOnTheQueue_When_No_DemosAreInTheDatabase() {
Message<?> message = outputQueue.receive(5000);
assertThat(message, nullValue()) ;
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_HaveMessageOnTheQueue_When_DemosIsInTheDatabase() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
Message<?> message = outputQueue.receive(5000);
assertThat(message, notNullValue());
assertThat(message.getPayload().toString(), equalTo("15317")) ;
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void get_message_directly_from_adapter() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
Message<?> message = adapter.receive();
assertThat(message, notNullValue());
private static class Demo {
private String demo;
String getDemo() {
return demo;
void setDemo(String value) {
this.demo = value;
public String toString() {
return "Demo [value=" + this.demo + "]";
public static class DemoRowMapper implements RowMapper<Demo> {
public Demo mapRow(ResultSet rs, int rowNum) throws SQLException {
Demo demo = new Demo();
return demo;
public static class MyFlowAdapter extends IntegrationFlowAdapter {
DataSource dataSource;
protected IntegrationFlowDefinition<?> buildFlow() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
return from(adapter,
c -> c.poller(Pollers.fixedRate(1000L, 2000L)
EDIT I've simplified it as much as I can, refactoring to code below. The test passes a flow with a generic message source, and fails on a flow with JdbcPollingChannelAdapter message source. It's just not evident to me how I should configure the second message source so that it will suceed like the first message source.
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
public void Should_HaveMessageOnTheQueue_When_UnsentDemosIsInTheDatabase() {
this.genericFlowContext.registration(new GenericFlowAdapter()).register();
PollableChannel genericChannel = this.beanFactory.getBean("GenericFlowAdapterOutput",
this.jdbcPollingFlowContext.registration(new JdbcPollingFlowAdapter()).register();
PollableChannel jdbcPollingChannel = this.beanFactory.getBean("JdbcPollingFlowAdapterOutput",
assertThat(genericChannel.receive(5000).getPayload(), equalTo("15317"));
assertThat(jdbcPollingChannel.receive(5000).getPayload(), equalTo("15317"));
private static class GenericFlowAdapter extends IntegrationFlowAdapter {
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("GenericFlowAdapterOutput"));
private MessageSource<Object> getObjectMessageSource() {
return () -> new GenericMessage<>("15317");
private static class JdbcPollingFlowAdapter extends IntegrationFlowAdapter {
DataSource dataSource;
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("JdbcPollingFlowAdapterOutput"));
private MessageSource<Object> getObjectMessageSource() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
return adapter;
Looks like you need to add @EnableIntegration
to your test configuration.
When you use Spring Boot slices for testing, not all auto-configurations are loaded:
The problem that JdbcPollingChannelAdapter
is run in the separate, scheduled thread, already out of the original transaction around test method, where those @Sql
s are performed.
The fix for you is like this:
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');",
config = @SqlConfig(transactionMode = SqlConfig.TransactionMode.ISOLATED))
Pay attention to that SqlConfig.TransactionMode.ISOLATED
. This way the INSERT
transaction is committed and the data is available for that separate polling thread for the JdbcPollingChannelAdapter
Also pay attention that this JdbcPollingChannelAdapter
always returns a List
of records. So, your assertThat(jdbcPollingChannel.receive(5000).getPayload(), ...);
should be against a List<String>
even if there is only one record in the table.