I use Redis Streams in my Spring Boot application. Within a scheduler I regularly want to get all the pending messages and check how long they are already processing and re-trigger them if necessary.
My problem is now that I can get the pending messages, but I'm not sure how to get the payload.
My first approach used the pending
and range
operations. The downside here is that the totalDeliveryCount is not increased with range
- so I cannot use the range method
val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
if (map != null && map.size > 0) {
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
My second approach used the pending
and read
operations. For the read operation I can specify an offset with the current ID. The problem is that I only get IDs back which are higher then the one specified.
val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream<String, Any>()
.read(it.consumer, StreamReadOptions.empty().count(1),
StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
So when I use ReadOffset.from('1234-0')
I don't get the message with 1234-0
but everything after that message. Is there a way to get the exact message and also honoring the totalDeliveryCount
and elapsedTimeSinceLastDelivery
statistic?
I'm using spring-data-redis 2.3.1.RELEASE
I'm using following workaround now, which should be good for most cases:
return if (id.sequence > 0) {
"${id.timestamp}-${id.sequence - 1}"
} else {
"${id.timestamp - 1}-99999"
}
It relies on the fact that there are not more then 99999 messages inserted per ms.