I am using DUPS_OK_ACKNOWLEDGE mode while consuming messages from queue, i need to detect duplicates and ignore them.
.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
.destination(sourceQueue)
.configureListenerContainer(spec -> {
spec.sessionTransacted(false);
spec.sessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
}))
.transform(orderTransformer)
.handle(orderService, "save")
.get();
I have an idempotent receiver advice.
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
IdempotentReceiverInterceptor idempotentReceiverInterceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
(String) m.getHeaders().get("JMSMessageId")));
idempotentReceiverInterceptor.setDiscardChannelName("ignoreDuplicates");
idempotentReceiverInterceptor.setThrowExceptionOnRejection(false);
return idempotentReceiverInterceptor;
}
I am stuck with 2 things
The Idempotent Receiver is definitely a matter of consumer, but not producer, which is Jms.messageDrivenChannelAdapter()
. In terms of Spring Integration, of course.
If you don't want to pass duplicates to downstream, you need to configure such an Advice
on the consumer after that Jms.messageDrivenChannelAdapter()
. In your case it is .transform(orderTransformer)
. So, the code might look like this:
.transform(orderTransformer, e -> e.advice(idempotentReceiverInterceptor()))
The oracle/mysql MetadataStore
is here - JdbcMetadataStore
since 5.0
: https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/jdbc.html#jdbc-metadata-store