Search code examples
redisspring-dataspring-data-redislettucespring-data-redis-reactive

Is XClaim / claim supported in redis spring data - ReactiveRedisOperations.opsForStream()


To build a reliable message queue using redis streams, i am using spring-boot-starter-data-redis-reactive and lettuce dependency to process the messages from redis stream. Though i am able to add, read, ack and delete message through the api available in ReactiveRedisOperations.opsForStream() in the form of consumer group, i couldn't find an api to claim a pending message which are not acknowledged for 5mins though its available under this.reactiveRedisConnectionFactory .getReactiveConnection() .streamCommands() .xClaim(). But i don't want to have a boilerplate code to manage the exceptions, serialization, etc. Is there a way to claim a message using ReactiveRedisOperations.opsForStream()

https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveStreamOperations.html


Solution

  • Without spring data redis, using lettuce client library directly i am able to get the pending message as well claim a message as below

    public Flux<PendingMessage> getPendingMessages(PollMessage pollMessage, String queueName) {
        Predicate<PendingMessage> poisonMessage = pendingMessage -> (pendingMessage.getTotalDeliveryCount()<=maxRetries);
        Predicate<PendingMessage> nackMessage = pendingMessage -> (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMillis(ackTimeout)) > 0 );
    
        return statefulRedisClusterConnection.reactive()
            .xpending(queueName, pollMessage.getConsumerGroupName(), Range.unbounded(), Limit.from(1000))
            .collectList()
            .map((it) -> ((PendingMessages)PENDING_MESSAGES_CONVERTER
                    .apply(it, pollMessage.getConsumerGroupName()))
                    .withinRange(org.springframework.data.domain.Range.unbounded()))
                .flatMapMany(Flux::fromIterable)
                .filter(nackMessage)
                .filter(poisonMessage)
                .limitRequest(pollMessage.getBatchSize());
    }
    

    In order to claim the message, again i have used the api available in lettuce library

    public Flux<StreamMessage<String, String>> claimMessage(PendingMessage pendingMessage, String queueName, String groupName, String serviceName) {
        return statefulRedisClusterConnection.reactive()
                .xclaim(queueName, Consumer.from(groupName, serviceName), 0, pendingMessage.getIdAsString());
    }
    

    At the moment, getting pending message from redis through spring-data has issues hence i have used lettuce library directly to get a pending message and claim it.

    https://jira.spring.io/browse/DATAREDIS-1160