How to set the JdbcMessageStore
to the Aggregator so it would use RDBMS rather than the in-memory message store?
Currently AggregatorAnnotationPostProcessor
directly setting new SimpleMessageStore()
to the AggregatingMessageHandler
by the framework.
Here are the configurations and this is working as expected without the JdbcMessageStore
.
@Bean
public ConsumerFactory<?,?> kafkaConsumerFactory(KafkaProperties properties) {
ConsumerProperties props = properties.buildConsumerProperties();
return DefaultKafkaConsumerFactory<>(props);
}
@Bean
@InboundChannelAdapter(channel = "fromChannel", poller = @Poller(fixedDelay = "1000"))
public KafkaMessageSource<String, MyPojo> kafkaMessageSource(ConsumerFactory<String, MyPojo> cf) {
ConsumerProperties props = new ConsumerProperties("topic.in");
return new KafkaMessageSource<>(cf, props);
}
@Bean
public MessageChannel fromChannel() {
return new DirectChannel();
}
@Aggregator(inputChannel = "fromChannel", outputChannel = "toChannel")
public List<MyPojo> aggregate(List<MyPojo> list) {
//apply logic
return newList;
}
@CorrelationStrategy
public Object correate(Message<MyPojo> message) {
//apply logic
//return correlationId; //String
}
@ReleaseStrategy
public boolean checkRelease(Message<MyPojo> message) {
//apply logic
//return canRelease; //boolean
}
@Bean
public ProducerFactory<?,?> kafkaProducerFactory(KafkaProperties properties) {
ConsumerProperties props = properties.buildProducerProperties();
return DefaultKafkaProducerFactory<>(props);
}
@Bean
@ServiceActivator(inputChannel= "toChannel")
public MessageHandler handler(KafkaTemplate<String, List<MyPojo>> kafkaTemplate) {
KafkaProducerMessageHandler<String, List<MyPojo>> handler = new KafkaProducerMessageHandler<>(kafkaTemplate);
handler.setTopicExpression(new LiteralExpression("topic-out"));
return handler;
}
@Bean
public MessageChannel toChannel() {
return new DirectChannel();
}
@Bean
public MessageGroupStore messageGroupStore(DataSource dataSource) {
return new JdbcMessageStore(dataSource);
}
For that purpose we recommend to use more advance configuration via AggregatorFactoryBean
: https://docs.spring.io/spring-integration/reference/aggregator.html#aggregator-annotations
@Bean
@ServiceActivator(inputChannel = "fromChannel")
AggregatorFactoryBean aggregatorFactoryBean(MessageGroupStore messageGroupStore, MessageChannel toChannel) {
AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
aggregatorFactoryBean.setMessageStore(messageGroupStore);
aggregatorFactoryBean.setProcessorBean(...);
aggregatorFactoryBean.setCorrelationStrategy(...);
aggregatorFactoryBean.setReleaseStrategy(...);
aggregatorFactoryBean.setOutputChannel(toChannel);
return aggregatorFactoryBean;
}