Search code examples
springspring-bootredisspring-data-redisredis-streams

Spring Data Redis Streams, Cannot figure out what is happening to my unacknowleded messages?


I am using the following code to consume a Redis stream using a Spring Data Redis consumer group, but even though I have commented out the acknowledge command, my messages are not re-read after a server restart.

I would expect that if I didn't acknowledge the message, it should be re-read when the server gets killed and restarted. What am I missing here?

@Bean
@Autowired
public StreamMessageListenerContainer eventStreamPersistenceListenerContainerTwo(RedisConnectionFactory streamRedisConnectionFactory, RedisTemplate streamRedisTemplate) {

        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder().pollTimeout(Duration.ofMillis(100)).build();

        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(streamRedisConnectionFactory,
                        containerOptions);

        container.receive(Consumer.from("my-group", "my-consumer"),
                        StreamOffset.create("event-stream", ReadOffset.latest()),
                        message -> {
                                System.out.println("MessageId: " + message.getId());
                                System.out.println("Stream: " + message.getStream());
                                System.out.println("Body: " + message.getValue());
                                //streamRedisTemplate.opsForStream().acknowledge("my-group", message);
                        });

        container.start();

        return container;
}


Solution

  • After reading the Redis documentation on how streams work, I came up with the following to automatically process any unacknowledged but previously delivered messages for the consumer:

    // Check for any previously unacknowledged messages that were delivered to this consumer.
    log.info("STREAM - Checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");
    String offset = "0";
    while ((offset = processUnacknowledgedMessage(offset)) != null) {
            log.info("STREAM - Finished processing one unacknowledged message for " + this.getClass().getSimpleName() + " event stream listener: " + offset);
    }
    log.info("STREAM - Finished checking for previously unacknowledged messages for " + this.getClass().getSimpleName() + " event stream listener.");
    
    

    And the method that processes the messages:

    /**
     * Processes, and acknowledges the next previously delivered message, beginning
     * at the given message id offset.
     *
     * @param offset The last read message id offset.
     * @return The message that was just processed, or null if there are no more messages.
     */
    public String processUnacknowledgedMessage(String offset) {
            List<MapRecord> messages = streamRedisTemplate.opsForStream().read(Consumer.from(groupName(), consumerName()),
                            StreamReadOptions.empty().noack().count(1),
                            StreamOffset.create(streamKey(), ReadOffset.from(offset)));
            String lastMessageId = null;
            for (MapRecord message : messages) {
                    if (log.isDebugEnabled()) log.debug(String.format("STREAM - Processing event(%s) from stream(%s) during startup: %s", message.getId(), message.getStream(), message.getValue()));
                    processRecord(message);
                    if (log.isDebugEnabled()) log.debug(String.format("STREAM - Finished processing event(%s) from stream(%s) during startup.", message.getId(), message.getStream()));
                    streamRedisTemplate.opsForStream().acknowledge(groupName(), message);
                    lastMessageId = message.getId().getValue();
            }
            return lastMessageId;
    }