Search code examples
springrabbitmqspring-amqpspring-rabbit

Spring RabbitMq Listener Configuration


We are using RabbitMq with default spring boot configurations. We have a use case in which we want no parallelism for one of the listeners. That is, we want only one thread of the consumer to be running at any given point in time. We want this, because the nature of the use case is such that we want the messages to be consumed in order, thus if there are multiple threads per consumer there can be chances that the messages are processed out of order. Since, we are using the defaults and have not explicitly tweaked the container, we are using the SimpleMessageListenerContainer. By looking at the documentation I tried fixing the number of consumers using concurrency = "1" . The annotation on the target method looks like this @RabbitListener(queues = ["queue-name"], concurrency = "1").

As per the documentation this should have ensured that there is only consumer thread.

{@link org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer * SimpleMessageListenerContainer} if this value is a simple integer, it sets a fixed * number of consumers in the {@code concurrentConsumers} property

2021-10-29 06:11:26.361 INFO  29752 --- [ntContainer#4-1] c.t.t.i.p.s.xxx         : Created xxx
2021-10-29 06:11:26.383 INFO  29752 --- [ntContainer#0-1] c.t.t.i.p.s.xxx         : Created xxx

ThreadIds to be noted here are [ntContainer#4-1] and [ntContainer#0-1].

So the question is- how can we ensure that there is only one thread per consumer at any given point in time ?

Edit: Adding the code of the consumer class for more context

@ConditionalOnProperty(value = ["rabbitmq.sharebooking.enabled"], havingValue = "true", matchIfMissing = false)
class ShareBookingConsumer @Autowired constructor(
    private val shareBookingRepository: ShareBookingRepository,
    private val objectMapper: ObjectMapper,
    private val shareDtoToShareBookingConverter: ShareBookingDtoToShareBookingConverter
) {
    private val logger = LoggerFactory.getLogger(javaClass)

    init {
        logger.info("start sharebooking created consumer")
    }

    @RabbitListener(queues = ["tax_engine.share_booking"], concurrency = "1-1", exclusive = true)
    @Timed
    @Transactional
    fun consumeShareBookingCreatedEvent(message: Message) {
        try {
            consumeShareBookingCreatedEvent(message.body)
        } catch (e: Exception) {
            throw AmqpRejectAndDontRequeueException(e)
        }
    }

    private fun consumeShareBookingCreatedEvent(event: ByteArray) {
        toShareBookingCreationMessageEvent(event).let { creationEvent ->
            RmqMetrics.measureEventMetrics(creationEvent)
            val shareBooking = shareDtoToShareBookingConverter.convert(creationEvent.data)
            val persisted = shareBookingRepository.save(shareBooking)
            logger.info("Created shareBooking ${creationEvent.data.id}")
        }
    }

    private fun toShareBookingCreationMessageEvent(event: ByteArray) =
        objectMapper.readValue(event, shareBookingCreateEventType)

    companion object {
        private val shareBookingCreateEventType =
            object : TypeReference<RMQMessageEnvelope<ShareBookingCreationDto>>() {}
    }
}

Edit: Adding application thread analysis using visualvm
5 threads get created for 5 listeners.

  [1]: https://i.sstatic.net/gQINE.png

Solution

  • Set concurrency = "1-1". Note that the concurrency of the Listener depends not only on concurrentConsumers, but also on maxConcurrentConsumers:

    enter image description here

    If you are using a custom factory:

      @Bean
      public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory cachingConnectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cachingConnectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        return factory;
      }
    
    

    See: https://docs.spring.io/spring-amqp/docs/current/reference/html/#simplemessagelistenercontainer for detail.

    EDIT: I did a simple test, 2 consumers&2 threads:

     @RabbitListener(queues = "myQueue111", concurrency = "1-1")
      public void handleMessage(Object message) throws InterruptedException {
        LOGGER.info("Received message : {} in {}", message, Thread.currentThread().getName());
      }
    
      @RabbitListener(queues = "myQueue222", concurrency = "1-1")
      public void handleMessag1e(Object message) throws InterruptedException {
        LOGGER.info("Received message222 : {} in {}", message, Thread.currentThread().getName());
      }
    

    enter image description here