Search code examples
spring-integrationspring-integration-amqp

How to handle errors after message has been handed off to QueueChannel?


I have 10 rabbitMQ queues, called event.q.0, event.q.2, <...>, event.q.9. Each of these queues receive messages routed from event.consistent-hash exchange. I want to build a fault tolerant solution that will consume messages for a specific event in sequential manner, since ordering is important. For this I have set up a flow that listens to those queues and routes messages based on event ID to a specific worker flow. Worker flows work based on queue channels so that should guarantee the FIFO order for an event with specific ID. I have come up with with the following set up:

@Bean
public IntegrationFlow eventConsumerFlow(RabbitTemplate rabbitTemplate, Advice retryAdvice) {
    return IntegrationFlows
            .from(
                    Amqp.inboundAdapter(new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory()))
                            .configureContainer(c -> c
                                    .adviceChain(retryAdvice())
                                    .addQueueNames(queueNames)
                                    .prefetchCount(amqpProperties.getPreMatch().getDefinition().getQueues().getEvent().getPrefetch())
                            )
                            .messageConverter(rabbitTemplate.getMessageConverter())
            )
            .<Event, String>route(e -> String.format("worker-input-%d", e.getId() % numberOfWorkers))
            .get();
}

private Advice deadLetterAdvice() {
    return RetryInterceptorBuilder
            .stateless()
            .maxAttempts(3)
            .recoverer(recoverer())
            .backOffPolicy(backOffPolicy())
            .build();
}

private ExponentialBackOffPolicy backOffPolicy() {
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(3.0);
    backOffPolicy.setMaxInterval(15000);

    return backOffPolicy;
}

private MessageRecoverer recoverer() {
    return new RepublishMessageRecoverer(
            rabbitTemplate,
            "error.exchange.dlx"
    );
}

@PostConstruct
public void init() {
    for (int i = 0; i < numberOfWorkers; i++) {
        flowContext.registration(workerFlow(MessageChannels.queue(String.format("worker-input-%d", i), queueCapacity).get()))
                .autoStartup(false)
                .id(String.format("worker-flow-%d", i))
                .register();
    }
}

private IntegrationFlow workerFlow(QueueChannel channel) {
    return IntegrationFlows
        .from(channel)
        .<Object, Class<?>>route(Object::getClass, m -> m
                .resolutionRequired(true)
                .defaultOutputToParentFlow()
                .subFlowMapping(EventOne.class, s -> s.handle(oneHandler))
                .subFlowMapping(EventTwo.class, s -> s.handle(anotherHandler))
        )
        .get();
}

Now, when lets say an error happens in eventConsumerFlow, the retry mechanism works as expected, but when an error happens in workerFlow, the retry doesn't work anymore and the message doesn't get sent to dead letter exchange. I assume this is because once message is handed off to QueueChannel, it gets acknowledged automatically. How can I make the retry mechanism work in workerFlow as well, so that if exception happens there, it could retry a couple of times and send a message to DLX when tries are exhausted?


Solution

  • If you want resiliency, you shouldn't be using queue channels at all; the messages will be acknowledged immediately after the message is put in the in-memory queue;if the server crashes, those messages will be lost.

    You should configure a separate adapter for each queue if you want no message loss.

    That said, to answer the general question, any errors on downstream flows (including after a queue channel) will be sent to the errorChannel defined on the inbound adapter.