Search code examples
javaspring-integration

How to poll data from DB using Spring Integration's JdbcPollingChannelAdapter for a certain duration, pass it to a listener through a channel?


I want to poll 100 messages from DB every 120 seconds for which I have written following bean.

@Component
class AccountConfiguration {

@Autowired
@Qualifier("outChannel") // defined in spring xml configuration
private MessageChannel outChannel;


    @Bean
    @InboundChannelAdapter(value = "outChannel",
            poller = @Poller(fixedDelay = "120", maxMessagesPerPoll = "100"))
    public List<Account> getAccounts(DataSource dataSource) {
        JdbcPollingChannelAdapter adapter = newJdbcPollingChannelAdapter(dataSource);
    adapter.setRowMapper(new AccountMapper());
    Message<Object> result = adapter.receive();
    List<Account> list = (ArrayList) result.payload();
    return list;
    
    }
}

Above code retrieves rows from DB. But now I want to pass this list to a listener below

@Component
class AccountMessageListener {
 public void onMessage(List<Account> list){
     System.out.println("Message received");
 }
}

Above listener I am trying to call as below

@Component
class AccountService{

@Autowired
@Qualifier("outChannel") // Autowiring the same channel here used above
private MessageChannel outChannel;

@Autowired
AccountMessageListener listener;

public void generateFile(String region){
   IntegrationFlows.from("outChannel").handle(listener,"onMessage").get();
}

}

@SpringBootApplication  
public class SpringBootExampleApplication   
{  
public static void main(String[] args)   
{  
ApplicationContext context = SpringApplication.run(SpringBootExampleApplication.class, args);  
AccountService service = context.getBean(AccountService.class);
  service.generateFile("ASIA");
}  
}  
    
// Below is from Spring xml 
<int:channel id="outChannel"/>
 <bean id="messageHandler" class="com.account.AccountMessageListener/>

My assumption is that when generateFile is invoked, outChannel will already have data which is passed to "outChannl" by bean getAccounts in AccountConfiguration class But when generateFile is invoked, it seems outChannel does not have any data and so onMessage is not called.

My queries are - how can I pass data from JdbcPollingChannelAdapter -> outChannel-> onMessage of AccountMessageListener every 120 secs for 2 hours; Also, is there a way to check number of messages in channel


Solution

    1. Your @InboundChannelAdapter configuration is not correct. Since you deal with a JdbcPollingChannelAdapter, then exactly this one has to be a result of getAccounts() bean method:

      @Bean
      @InboundChannelAdapter(value = "outChannel",
           poller = @Poller(fixedDelay = "120", maxMessagesPerPoll = "100"))
      public JdbcPollingChannelAdapter  getAccounts(DataSource dataSource) {
          JdbcPollingChannelAdapter adapter = newJdbcPollingChannelAdapter(dataSource);
          adapter.setRowMapper(new AccountMapper());
          return adapter;
      }
      

    You just don't call MessageSource.receive() yourself. The framework knows what to do with an @InboundChannelAdapter and how to configure and poll the provided bean. See more in docs: https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans

    1. You don't need an XML config if you deal with Spring Boot: better to have everything configured via @Configuration or @Component.

    2. Your usage of the Java DSL (an IntegrationFlow) is wrong. The IntegrationFlow has to be declared as a @Bean and you don't call configuration-related methods yourself:

      @Bean
      public IntegrationFlow generateFile(String region){
           return IntegrationFlows.from("outChannel").handle(listener,"onMessage").get();
      }
      

    See docs about Java DSL: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl

    1. It is not is not clear what is that region since it is out of use.
    2. The @Autowire for outChannel bean is useless: you just don't use that property in your code.

    There might be some other flaws and questions: your current code requires too much clean up and reworks.