I want to poll 100 messages from DB every 120 seconds for which I have written following bean.
@Component
class AccountConfiguration {
@Autowired
@Qualifier("outChannel") // defined in spring xml configuration
private MessageChannel outChannel;
@Bean
@InboundChannelAdapter(value = "outChannel",
poller = @Poller(fixedDelay = "120", maxMessagesPerPoll = "100"))
public List<Account> getAccounts(DataSource dataSource) {
JdbcPollingChannelAdapter adapter = newJdbcPollingChannelAdapter(dataSource);
adapter.setRowMapper(new AccountMapper());
Message<Object> result = adapter.receive();
List<Account> list = (ArrayList) result.payload();
return list;
}
}
Above code retrieves rows from DB. But now I want to pass this list to a listener below
@Component
class AccountMessageListener {
public void onMessage(List<Account> list){
System.out.println("Message received");
}
}
Above listener I am trying to call as below
@Component
class AccountService{
@Autowired
@Qualifier("outChannel") // Autowiring the same channel here used above
private MessageChannel outChannel;
@Autowired
AccountMessageListener listener;
public void generateFile(String region){
IntegrationFlows.from("outChannel").handle(listener,"onMessage").get();
}
}
@SpringBootApplication
public class SpringBootExampleApplication
{
public static void main(String[] args)
{
ApplicationContext context = SpringApplication.run(SpringBootExampleApplication.class, args);
AccountService service = context.getBean(AccountService.class);
service.generateFile("ASIA");
}
}
// Below is from Spring xml
<int:channel id="outChannel"/>
<bean id="messageHandler" class="com.account.AccountMessageListener/>
My assumption is that when generateFile is invoked, outChannel will already have data which is passed to "outChannl" by bean getAccounts in AccountConfiguration class But when generateFile is invoked, it seems outChannel does not have any data and so onMessage is not called.
My queries are - how can I pass data from JdbcPollingChannelAdapter -> outChannel-> onMessage of AccountMessageListener every 120 secs for 2 hours; Also, is there a way to check number of messages in channel
Your @InboundChannelAdapter
configuration is not correct. Since you deal with a JdbcPollingChannelAdapter
, then exactly this one has to be a result of getAccounts()
bean method:
@Bean
@InboundChannelAdapter(value = "outChannel",
poller = @Poller(fixedDelay = "120", maxMessagesPerPoll = "100"))
public JdbcPollingChannelAdapter getAccounts(DataSource dataSource) {
JdbcPollingChannelAdapter adapter = newJdbcPollingChannelAdapter(dataSource);
adapter.setRowMapper(new AccountMapper());
return adapter;
}
You just don't call MessageSource.receive()
yourself. The framework knows what to do with an @InboundChannelAdapter
and how to configure and poll the provided bean.
See more in docs: https://docs.spring.io/spring-integration/reference/html/configuration.html#annotations_on_beans
You don't need an XML config if you deal with Spring Boot: better to have everything configured via @Configuration
or @Component
.
Your usage of the Java DSL (an IntegrationFlow
) is wrong. The IntegrationFlow
has to be declared as a @Bean
and you don't call configuration-related methods yourself:
@Bean
public IntegrationFlow generateFile(String region){
return IntegrationFlows.from("outChannel").handle(listener,"onMessage").get();
}
See docs about Java DSL: https://docs.spring.io/spring-integration/reference/html/dsl.html#java-dsl
region
since it is out of use.@Autowire
for outChannel
bean is useless: you just don't use that property in your code.There might be some other flaws and questions: your current code requires too much clean up and reworks.