I'm trying to build integration flow, which will prevent the loss of messages during delivery to AMQP broker (rabbitMQ). In the case of broker stopping, I see some unexpected for me behavior:
Please help me in investigations. Code Example:
@Bean
public MessageChannel messageStoreBackedChannel() {
return new QueueChannel(
new MessageGroupQueue(jdbcChannelMessageStore(), "Group_ID")
);
}
@Bean
public IntegrationFlow someFlow() {
return IntegrationFlows
.from("messageStoreBackedChannel")
.channel("amqpMessageChannel")
.get();
}
@Bean
public IntegrationFlow jmsExtractFlow(EntityManagerFactory entityManagerFactory) {
return IntegrationFlows
.from("amqpMessageChannel")
.handle(message -> System.out.println(message.getPayload()))
.get();
}
@Bean
public MessageChannel amqpMessageChannel() {
return new PollableAmqpChannel("amqpMessageChannel", amqpTemplate);
}
@Bean
public JdbcChannelMessageStore jdbcChannelMessageStore() {
var jdbcChannelMessageStore = new JdbcChannelMessageStore(dataSource);
jdbcChannelMessageStore.setChannelMessageStoreQueryProvider(new PostgresChannelMessageStoreQueryProvider());
return jdbcChannelMessageStore;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
Consider to configure an endpoint in between your .from("messageStoreBackedChannel").channel("amqpMessageChannel")
as transactional()
.
Something like this:
.from("messageStoreBackedChannel")
.bridge(e -> e.poller(p -> p.fixedDelay(10).transactional()))
.channel("amqpMessageChannel")
So, whenever delivery to the amqpMessageChannel
fails, a transaction is going to roll back and the failed message will come back to the store until the next poll.
Of course you can stop that bridge
endpoint when you get an error connecting to RabbitMQ. But how can you determine then that connection comes back?..