Search code examples
javaspring-integrationspring-amqpspring-rabbit

Spring integration - Publisher Confirms with timeout?


This is my current setup:

queue1 and queue2 are marged together with integration flow to channel1:

@Bean
public IntegrationFlow q1f() {
    return IntegrationFlows
            .from(queue1InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

@Bean
public IntegrationFlow q2f() {
    return IntegrationFlows
            .from(queue2InboundAdapter())
            ...
            .channel(amqpInputChannel())
            .get();
}

then, everything is aggregated and then confirmed after aggregated message is confirmed by rabbitmq:

@Bean
    public IntegrationFlow aggregatingFlow() {
        return IntegrationFlows
                .from(amqpInputChannel())
                .aggregate(...
                        .expireGroupsUponCompletion(true)
                        .sendPartialResultOnExpiry(true)
                        .groupTimeout(TimeUnit.SECONDS.toMillis(10))
                        .releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(200, TimeUnit.SECONDS.toMillis(10)))
                )
                .handle(amqpOutboundEndpoint())
                .get();
    }

    @Bean
    public AmqpOutboundEndpoint amqpOutboundEndpoint() {
        AmqpOutboundEndpoint outboundEndpoint = new AmqpOutboundEndpoint(ackTemplate());
        outboundEndpoint.setConfirmAckChannel(manualAckChannel());
        outboundEndpoint.setConfirmCorrelationExpressionString("#root");
        outboundEndpoint.setExchangeName(RABBIT_PREFIX + "ix.archiveupdate");
        outboundEndpoint.setRoutingKeyExpression(routingKeyExpression()); //forward using patition id as routing key
        return outboundEndpoint;
    }

ackTemplate() is set with cf that has springFactory.setPublisherConfirms(true);.

The problem I see is that once in 10 days, there are some messages that are stuck in unacknowledged state in rabbitmq.

My guess is that somehow publish of message is waiting for rabbit to do PUBLISHER CONFIRMS but it never gets it and times out? In this case, I never ACK message in queue1. Is this possible?

So just one more time complete workflow:

[two queues -> direct channel -> aggregator (keeps channel and tag values) -> publish to rabbit -> rabbit returns ACK via publisher confirms -> spring confirms all messages on channel+values that it kept in memory for aggregated message]

I also have my implementation of aggregator (since I need to manually ack messages from both q1 and q2):

public abstract class AbstractManualAckAggregatingMessageGroupProcessor extends AbstractAggregatingMessageGroupProcessor {
    public static final String MANUAL_ACK_PAIRS = PREFIX + "manualAckPairs";
    private AckingState ackingState;

    public AbstractManualAckAggregatingMessageGroupProcessor(AckingState ackingState){
        this.ackingState = ackingState;
    }

    @Override
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        Map<String, Object> aggregatedHeaders = super.aggregateHeaders(group);
        List<ManualAckPair> manualAckPairs = new ArrayList<>();
        group.getMessages().forEach(m -> {
            Channel channel = (Channel)m.getHeaders().get(AmqpHeaders.CHANNEL);
            Long deliveryTag = (Long)m.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
            manualAckPairs.add(new ManualAckPair(channel, deliveryTag, ackingState));
        });
        aggregatedHeaders.put(MANUAL_ACK_PAIRS, manualAckPairs);
        return aggregatedHeaders;
    }
}

UPDATE

This is how rabbit admin looks (2 unacked messages for a long time, and it will not be ACKED untill restart - when it is redelivered): enter image description here


Solution

  • In Spring AMQP version 2.1 (Spring Integration 5.1), We added a Future<?> and returned message to the CorrelationData to assist with this kind of thing. If you are using an older version, you can subclass CorrelationData (and you'd have to handle setting the future and returned message in your code).

    This, together with a scheduled task, can detect missing acks...

    @SpringBootApplication
    @EnableScheduling
    public class Igh2755Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Igh2755Application.class, args);
        }
    
        private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                SuccessCallback<? super Confirm> successCallback = confirm -> {
                    System.out.println((confirm.isAck() ? "A" : "Na") + "ck received");
                };
                FailureCallback failureCallback = throwable -> {
                    System.out.println(throwable.getMessage());
                };
    
                // Good - ack
                CorrelationData correlationData = new CorrelationData("good");
                correlationData.getFuture().addCallback(successCallback, failureCallback);
                this.futures.put(correlationData);
                template.convertAndSend("", "foo", "data", correlationData);
    
                // Missing exchange nack, no return
                correlationData = new CorrelationData("missing exchange");
                correlationData.getFuture().addCallback(successCallback, failureCallback);
                this.futures.put(correlationData);
                template.convertAndSend("missing exchange", "foo", "data", correlationData);
    
                // Missing queue ack, with return
                correlationData = new CorrelationData("missing queue");
                correlationData.getFuture().addCallback(successCallback, failureCallback);
                this.futures.put(correlationData);
                template.convertAndSend("", "missing queue", "data", correlationData);
            };
        }
    
        @Scheduled(fixedDelay = 5_000)
        public void checkForMissingAcks() {
            System.out.println("Checking pending acks");
            CorrelationData correlationData = this.futures.poll();
            while (correlationData != null) {
                try {
                    if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                        if (correlationData.getReturnedMessage() == null) {
                            System.out.println("Ack received OK for " + correlationData.getId());
                        }
                        else {
                            System.out.println("Message returned for " + correlationData.getId());
                        }
                    }
                    else {
                        System.out.println("Nack received for " + correlationData.getId());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println("Interrupted");
                }
                catch (ExecutionException e) {
                    System.out.println("Failed to get an ack " + e.getCause().getMessage());
                }
                catch (TimeoutException e) {
                    System.out.println("Timed out waiting for ack for " + correlationData.getId());
                }
                correlationData = this.futures.poll();
            }
            System.out.println("No pending acks, exiting");
        }
    
    }
    

    .

    Checking pending acks
    Ack received OK for good
    Nack received for missing exchange
    Message returned for missing queue
    No pending acks, exiting
    

    With Spring Integration there is a confirmCorrelationExpression which can be used to create the CorrelationData instance.

    EDIT

    With Spring Integration...

    @SpringBootApplication
    @EnableScheduling
    public class Igh2755Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Igh2755Application.class, args);
        }
    
        private final BlockingQueue<CorrelationData> futures = new LinkedBlockingQueue<>();
    
        public interface Gate {
    
            void send(@Header("exchange") String exchange, @Header("rk") String rk, String payload);
    
        }
    
        @Bean
        @DependsOn("flow")
        public ApplicationRunner runner(Gate gate) {
            return args -> {
                gate.send("", "foo", "good");
                gate.send("junque", "rk", "missing exchange");
                gate.send("", "junque", "missing queue");
            };
        }
    
        @Bean
        public IntegrationFlow flow(RabbitTemplate template) {
            return IntegrationFlows.from(Gate.class)
                        .handle(Amqp.outboundAdapter(template)
                                .confirmCorrelationExpression("@correlationCreator.create(#root)")
                                .exchangeNameExpression("headers.exchange")
                                .routingKeyExpression("headers.rk")
                                .returnChannel(returns())
                                .confirmAckChannel(acks())
                                .confirmNackChannel(acks()))
                        .get();
        }
    
        @Bean
        public MessageChannel acks() {
            return new DirectChannel();
        }
    
        @Bean
        public MessageChannel returns() {
            return new DirectChannel();
        }
    
        @Bean
        public IntegrationFlow ackFlow() {
            return IntegrationFlows.from("acks")
                    /*
                     * Work around a bug because the correlation data is wrapped and so the
                     * wrong future is completed.
                     */
                    .handle(m -> {
                        System.out.println(m);
                        if (m instanceof ErrorMessage) { // NACK
                            NackedAmqpMessageException nme = (NackedAmqpMessageException) m.getPayload();
                            CorrelationData correlationData = (CorrelationData) nme.getCorrelationData();
                            correlationData.getFuture().set(new Confirm(false, "Message was returned"));
                        }
                        else {
                            ((CorrelationData) m.getPayload()).getFuture().set(new Confirm(true, null));
                        }
                    })
                    .get();
        }
    
        @Bean
        public IntegrationFlow retFlow() {
            return IntegrationFlows.from("returns")
                    .handle(System.out::println)
                    .get();
        }
    
        @Bean
        public CorrelationCreator correlationCreator() {
            return new CorrelationCreator(this.futures);
        }
    
        public static class CorrelationCreator {
    
            private final BlockingQueue<CorrelationData> futures;
    
            public CorrelationCreator(BlockingQueue<CorrelationData> futures) {
                this.futures = futures;
            }
    
            public CorrelationData create(Message<String> message) {
                CorrelationData data = new CorrelationData(message.getPayload());
                this.futures.add(data);
                return data;
            }
    
        }
    
        @Scheduled(fixedDelay = 5_000)
        public void checkForMissingAcks() {
            System.out.println("Checking pending acks");
            CorrelationData correlationData = this.futures.poll();
            while (correlationData != null) {
                try {
                    if (correlationData.getFuture().get(10, TimeUnit.SECONDS).isAck()) {
                        if (correlationData.getReturnedMessage() == null
                                && !correlationData.getId().equals("Message was returned")) {
                            System.out.println("Ack received OK for " + correlationData.getId());
                        }
                        else {
                            System.out.println("Message returned for " + correlationData.getId());
                        }
                    }
                    else {
                        System.out.println("Nack received for " + correlationData.getId());
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println("Interrupted");
                }
                catch (ExecutionException e) {
                    System.out.println("Failed to get an ack " + e.getCause().getMessage());
    
                }
                catch (TimeoutException e) {
                    System.out.println("Timed out waiting for ack for " + correlationData.getId());
                }
                correlationData = this.futures.poll();
            }
            System.out.println("No pending acks, exiting");
        }
    
    }