Search code examples
multithreadingspring-bootrabbitmqspring-amqpspring-rabbit

SpringBoot: Manually ack multiple AMQP messages


I need to manually ack multiple messages in a rabbit listener only after they are successfully processed and stored. Spring boot configuration that is used is as following

listener:
  concurrency: 2
  max-concurrency: 20
  acknowledge-mode: manual
  prefetch: 30

The messages should be stored in batches of 20 at a time. Only when they are successfully stored, the multiple ack should be sent. There's also associated timeout with storage mechanism, which should store the messages after 20 seconds even if there's no 20 of them. Currently, I have the following code

@Slf4j
@Component    
class EventListener {

  @Autowired
  private EventsStorage eventsStorage
  private ConcurrentMap<Integer, ChannelData> channelEvents = new ConcurrentHashMap<>()

  @RabbitListener(queues = 'event-queue')
  void processEvent(@Payload Event event, Channel channel, @Header(DELIVERY_TAG) long tag) {
    log.debug("Event received for channel $channel.channelNumber")
    channelEvents.compute(channel.channelNumber, { k, channelData -> addEventAndStoreIfNeeded(channel, event, tag, channelData) })
  }

  private ChannelData addEventAndStoreIfNeeded(Channel channel, Event event, long tag, ChannelData channelData) {
    if (channelData) {
      channelData.addEvent(tag, event)
      if (channelData.getDeliveredEvents().size() >=   batchSize) {
        storeAndAckChannelEvents(channel.channelNumber)
      }
      return channelData
    } else {
      ChannelData newChannelData = new ChannelData(channel)
      newChannelData.addEvent(tag, event)
      return newChannelData
    }
  }

  void storeAndAckChannelEvents(Integer channelNumber) {
    channelEvents.compute(channelNumber, { k, channelData ->
      List<DeliveredEvent> deliveredEvents = channelData.deliveredEvents
      if (!deliveredEvents.isEmpty()) {
        def events = deliveredEvents.stream()
          .map({ DeliveredEvent deliveredEvent -> deliveredEvent.event })
          .collect(Collectors.toList())

        eventsStorage.store(events)
        long lastDeliveryTag = deliveredEvents.get(deliveredEvents.size() - 1).deliveryTag
        channelData.channel.basicAck(lastDeliveryTag, true)
        deliveredEvents.clear()
      }
    })
  }

  @Scheduled(fixedRate = 20000L)
  void storeMessagingEvents() {
    channelEvents.forEach({ k, channelData -> storeAndAckChannelEvents(channelData) })
  }

}

where ChannelData and DeliveredEvent are as following

class DeliveredMesssagingEvent {
  int deliveryTag
  Event event
}

class ChannelData {
  Channel channel
  List<DeliveredEvent> deliveredEvents = new ArrayList<>()

  ChannelData(Channel channel) {
    this.channel = channel
  }

  void addEvent(long tag, Event event) {
    deliveredEvents.add(new DeliveredEvent(deliveryTag: tag, event: event))
  }
}

The Channel used is com.rabbitmq.client.Channel. The docs about this interface state:

Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.

So, I'm doing quite opposite, sharing Channel between Scheduler and SimpleMessageListenerContainer worker threads. The output of my application is like this:

[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2 
[SimpleAsyncTaskExecutor-4] DEBUG EventListener - Event received for channel 3 
[SimpleAsyncTaskExecutor-5] DEBUG EventListener - Event received for channel 1 
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5 
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4 
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2 
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5 
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4 
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[pool-4-thread-1] DEBUG EventListener - Storing channel 5 events 
[pool-4-thread-1] DEBUG EventListener - Storing channel 2 events 
...

SimpleMessageListenerContainer worker-threads have their own Channel which does not change over the time.

Taking into account that I synced Scheduler and SimpleMessageListenerContainer worker threads, does anyone see any reason why this code is not thread safe?

Is there any other approach that I should try to manually ack multiple messages in Spring boot?


Solution

  • You will be ok as long as you sync the threads.

    Bear in mind, though, that if the connection is lost, you will get a new consumer and your sync thread will have stale data (the unack'd messages will be redelivered).

    However, you could also use container idle events.

    When a consumer thread has been idle for that time, the event is published on the same listener thread, so you could do the timed ack there and you wouldn't have to worry about synchronization.

    You can also detect consumer failed events if the connection is lost.