Search code examples
javamultithreadingspringamqpspring-amqp

Acting on 403s in Spring-AMQP


I need to guarantee consumer exclusivity with a variable number of consumer threads in different runtimes consuming from a fixed number of queues (where the number of queues is much greater than that of consumers).

My general thought was that I'd have each consumer thread attempt to establish an exclusive connection to clear a queue, and, if it went a given period without receiving a message from that queue, redirect it to another queue.

Even if a queue is temporarily cleared, it's liable to receive messages again in the future, so that queue cannot simply be forgotten about -- instead, a consumer should return to it later. To achieve that rotation, I thought I'd use a queue-of-queues. The danger would be losing references to queues within the queue-of-queues when consumers fail; I thought that seemed solvable with acknowledgements, as follows.

Essentially, each consumer thread waits to get a message (A) with a reference to a queue (1) from the queue-of-queues; message (A) remains initially unacknowledged. The consumer happily attempts to clear queue (1), and once queue (1) remains empty for a given amount of time, the consumer requests a new queue name from the queue-of-queues. Upon receiving a second message (B) and a reference to a new queue (2), the reference to queue (1) is put back on the end of the queue-of-queues as a new message (C), and finally message (A) is acknowledged.

In fact, the queue-of-queue's delivered-at-least-and-probably-only-once guarantee almost gets me exclusivity for the normal queues (1, 2) here, but in order to make sure I absolutely don't lose references to queues, I need to republish queue (1) as message (C) before I acknowledge message (A). That means if a server fails after republishing queue (1) as message (C) but before acknowledging (A), two references to queue (1) could exist in the queue-of-queues, and exclusivity is no longer guaranteed.

Therefore, I'd need to use AMQP's exclusive consumers flags, which are great, but as it stands, I'd also like to NOT republish a reference to a queue if I received a "403 ACCESS REFUSED" for it, so that duplicate references do not proliferate.

However, I'm using Spring's excellent AMQP library, and I don't see how I can hook in with an error handler. The setErrorHandler method exposed on the container doesn't seem for the "403 ACCESS REFUSED" errors.

Is there a way that I can act on the 403s with the frameworks I'm currently using? Alternatively, is there another way I can achieve the guarantees that I need? My code is below.

The "monitoring service":

import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpAuthenticationException;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class ListenerMonitoringService {

    private static final Logger log = LoggerFactory.getLogger(ListenerMonitoringService.class);

    private static final Period EXPIRATION_PERIOD = Period.millis(5000);

    private static final long MONTIORING_POLL_INTERVAL = 5000;
    private static final long MONITORING_INITIAL_DELAY = 5000;

    private final Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier;

    private final QueueCoordinator queueCoordinator;
    private final ScheduledExecutorService executorService;

    private final Collection<Record> records;

    public ListenerMonitoringService(Supplier<AbstractMessageListenerContainer> messageListenerContainerSupplier,
                                     QueueCoordinator queueCoordinator, ScheduledExecutorService executorService) {
        this.messageListenerContainerSupplier = messageListenerContainerSupplier;
        this.queueCoordinator = queueCoordinator;
        this.executorService = executorService;

        records = new ArrayList<>();
    }

    public void registerAndStart(MessageListener messageListener) {
        Record record = new Record(messageListenerContainerSupplier.get());

        // wrap with listener that updates record
        record.container.setMessageListener((MessageListener) (m -> {
            log.trace("{} consumed a message from {}", record.container, Arrays.toString(record.container.getQueueNames()));
            record.freshen(DateTime.now(DateTimeZone.UTC));
            messageListener.onMessage(m);
        }));

        record.container.setErrorHandler(e -> {
            log.error("{} received an {}", record.container, e);
            // this doesn't get called for 403s
        });

        // initial start up
        executorService.execute(() -> {
            String queueName = queueCoordinator.getQueueName();

            log.debug("Received queue name {}", queueName);
            record.container.setQueueNames(queueName);

            log.debug("Starting container {}", record.container);
            record.container.start();

            // background monitoring thread
            executorService.scheduleAtFixedRate(() -> {
                log.debug("Checking container {}", record.container);
                if (record.isStale(DateTime.now(DateTimeZone.UTC))) {
                    String newQueue = queueCoordinator.getQueueName();
                    String oldQueue = record.container.getQueueNames()[0];
                    log.debug("Switching queues for {} from {} to {}", record.container, oldQueue, newQueue);
                    record.container.setQueueNames(newQueue);

                    queueCoordinator.markSuccessful(queueName);
                }
            }, MONITORING_INITIAL_DELAY, MONTIORING_POLL_INTERVAL, TimeUnit.MILLISECONDS);
        });

        records.add(record);
    }

    private static class Record {
        private static final DateTime DATE_TIME_MIN = new DateTime(0);

        private final AbstractMessageListenerContainer container;
        private Optional<DateTime> lastListened;

        private Record(AbstractMessageListenerContainer container) {
            this.container = container;
            lastListened = Optional.empty();
        }

        public synchronized boolean isStale(DateTime now) {
            log.trace("Comparing now {} to {} for {}", now, lastListened, container);
            return lastListened.orElse(DATE_TIME_MIN).plus(EXPIRATION_PERIOD).isBefore(now);
        }

        public synchronized void freshen(DateTime now) {
            log.trace("Updating last listened to {} for {}", now, container);
            lastListened = Optional.of(now);
        }
    }
}

The "queue-of-queues" handler:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.GetResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.Connection;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

private class MetaQueueCoordinator implements QueueCoordinator {

    private static final Logger log = LoggerFactory.getLogger(MetaQueueCoordinator.class);

    private final Channel channel;
    private final Map<String, Envelope> envelopeMap;
    private final RabbitTemplate rabbitTemplate;

    public MetaQueueCoordinator(ConnectionFactory connectionFactory) {
        Connection connection = connectionFactory.createConnection();
        channel = connection.createChannel(false);

        envelopeMap = new ConcurrentHashMap<>();
        rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange("");
        rabbitTemplate.setRoutingKey("queue_of_queues");
    }

    @Override
    public String getQueueName() {
        GetResponse response;
        try {
            response = channel.basicGet("queue_of_queues", false);
        } catch (IOException e) {
            log.error("Unable to get from channel");
            throw new RuntimeException(e);
        }

        String queueName = new String(response.getBody());
        envelopeMap.put(queueName, response.getEnvelope());

        return queueName;
    }

    @Override
    public void markSuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        log.debug("Putting {} at the end of the line...", queueName);
        rabbitTemplate.convertAndSend(queueName);

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }

    @Override
    public void markUnsuccessful(String queueName) {
        Envelope envelope = envelopeMap.remove(queueName);
        if (envelope == null) {
            return;
        }

        try {
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (IOException e) {
            log.error("Unable to acknowledge {}", queueName);
        }
    }
}

Solution

  • The ErrorHandler is for handling errors during message delivery, not setting up the listener itself.

    The upcoming 1.5 release publishes application events when exceptions such as this occur.

    It will be released later this summer; this feature is currently only available in the 1.5.0.BUILD-SNAPSHOT; a release candidate should be available in the next few weeks.

    The project page shows how to get the snapshot from the snapshots repo.