Search code examples
spring-integrationspring-integration-dsl

spring integration dsl configure idempotent receiver to identify duplicates


I am using DUPS_OK_ACKNOWLEDGE mode while consuming messages from queue, i need to detect duplicates and ignore them.

.from(Jms.messageDrivenChannelAdapter(activeMQConnectionFactory)
                        .destination(sourceQueue)
                        .configureListenerContainer(spec -> {
                            spec.sessionTransacted(false);
                            spec.sessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
                        }))
                .transform(orderTransformer)
                .handle(orderService, "save")
                .get();

I have an idempotent receiver advice.

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    IdempotentReceiverInterceptor idempotentReceiverInterceptor = new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
            (String) m.getHeaders().get("JMSMessageId")));
    idempotentReceiverInterceptor.setDiscardChannelName("ignoreDuplicates");
    idempotentReceiverInterceptor.setThrowExceptionOnRejection(false);
    return idempotentReceiverInterceptor;
}

I am stuck with 2 things

  1. How to i configure/call this advice in the Jms.messageDrivenChannelAdapter?
  2. If i need the metadata store to be in oracle/mysql, how does the table look like any example links

Solution

  • The Idempotent Receiver is definitely a matter of consumer, but not producer, which is Jms.messageDrivenChannelAdapter(). In terms of Spring Integration, of course.

    If you don't want to pass duplicates to downstream, you need to configure such an Advice on the consumer after that Jms.messageDrivenChannelAdapter(). In your case it is .transform(orderTransformer). So, the code might look like this:

    .transform(orderTransformer, e -> e.advice(idempotentReceiverInterceptor()))
    

    The oracle/mysql MetadataStore is here - JdbcMetadataStore since 5.0: https://docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/html/jdbc.html#jdbc-metadata-store