I need to manually ack multiple messages in a rabbit listener only after they are successfully processed and stored. Spring boot configuration that is used is as following
listener:
concurrency: 2
max-concurrency: 20
acknowledge-mode: manual
prefetch: 30
The messages should be stored in batches of 20 at a time. Only when they are successfully stored, the multiple ack should be sent. There's also associated timeout with storage mechanism, which should store the messages after 20 seconds even if there's no 20 of them. Currently, I have the following code
@Slf4j
@Component
class EventListener {
@Autowired
private EventsStorage eventsStorage
private ConcurrentMap<Integer, ChannelData> channelEvents = new ConcurrentHashMap<>()
@RabbitListener(queues = 'event-queue')
void processEvent(@Payload Event event, Channel channel, @Header(DELIVERY_TAG) long tag) {
log.debug("Event received for channel $channel.channelNumber")
channelEvents.compute(channel.channelNumber, { k, channelData -> addEventAndStoreIfNeeded(channel, event, tag, channelData) })
}
private ChannelData addEventAndStoreIfNeeded(Channel channel, Event event, long tag, ChannelData channelData) {
if (channelData) {
channelData.addEvent(tag, event)
if (channelData.getDeliveredEvents().size() >= batchSize) {
storeAndAckChannelEvents(channel.channelNumber)
}
return channelData
} else {
ChannelData newChannelData = new ChannelData(channel)
newChannelData.addEvent(tag, event)
return newChannelData
}
}
void storeAndAckChannelEvents(Integer channelNumber) {
channelEvents.compute(channelNumber, { k, channelData ->
List<DeliveredEvent> deliveredEvents = channelData.deliveredEvents
if (!deliveredEvents.isEmpty()) {
def events = deliveredEvents.stream()
.map({ DeliveredEvent deliveredEvent -> deliveredEvent.event })
.collect(Collectors.toList())
eventsStorage.store(events)
long lastDeliveryTag = deliveredEvents.get(deliveredEvents.size() - 1).deliveryTag
channelData.channel.basicAck(lastDeliveryTag, true)
deliveredEvents.clear()
}
})
}
@Scheduled(fixedRate = 20000L)
void storeMessagingEvents() {
channelEvents.forEach({ k, channelData -> storeAndAckChannelEvents(channelData) })
}
}
where ChannelData
and DeliveredEvent
are as following
class DeliveredMesssagingEvent {
int deliveryTag
Event event
}
class ChannelData {
Channel channel
List<DeliveredEvent> deliveredEvents = new ArrayList<>()
ChannelData(Channel channel) {
this.channel = channel
}
void addEvent(long tag, Event event) {
deliveredEvents.add(new DeliveredEvent(deliveryTag: tag, event: event))
}
}
The Channel
used is com.rabbitmq.client.Channel
. The docs about this interface state:
Channel instances must not be shared between threads. Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
So, I'm doing quite opposite, sharing Channel between Scheduler
and SimpleMessageListenerContainer
worker threads. The output of my application is like this:
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-4] DEBUG EventListener - Event received for channel 3
[SimpleAsyncTaskExecutor-5] DEBUG EventListener - Event received for channel 1
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[SimpleAsyncTaskExecutor-1] DEBUG EventListener - Event received for channel 5
[SimpleAsyncTaskExecutor-2] DEBUG EventListener - Event received for channel 4
[SimpleAsyncTaskExecutor-3] DEBUG EventListener - Event received for channel 2
[pool-4-thread-1] DEBUG EventListener - Storing channel 5 events
[pool-4-thread-1] DEBUG EventListener - Storing channel 2 events
...
SimpleMessageListenerContainer
worker-threads have their own Channel which does not change over the time.
Taking into account that I synced Scheduler
and SimpleMessageListenerContainer
worker threads, does anyone see any reason why this code is not thread safe?
Is there any other approach that I should try to manually ack multiple messages in Spring boot?
You will be ok as long as you sync the threads.
Bear in mind, though, that if the connection is lost, you will get a new consumer and your sync thread will have stale data (the unack'd messages will be redelivered).
However, you could also use container idle events.
When a consumer thread has been idle for that time, the event is published on the same listener thread, so you could do the timed ack there and you wouldn't have to worry about synchronization.
You can also detect consumer failed events if the connection is lost.