Search code examples
javaspring-bootspring-kafka

Kafka Spring Batch Consumer - Commit single offset


I have a problem with a Kafka batch Listener in SpringBoot.

@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.autoOffsetReset);

props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, this.maxFetchBytesMaxPartition);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, this.maxFetchBytesMax);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, ByteArrayDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, receiveBuffer);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, heartbeatInterval);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, minFetch);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, maxWaitFetch);
return props;
}

@Bean
public DefaultKafkaConsumerFactory<String, byte[]> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}


@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<>();

    try {
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.setBatchListener(true);
        factory.getContainerProperties().setSyncCommits(false);
        factory.setBatchErrorHandler(new SeekToCurrentBatchErrorHandler());
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    } catch(Exception e) {
        logger.error("Error KafkaListenerContainerFactory: {}", e.getMessage());
    }

    return factory;
}

So, this is @KafkaListener

@KafkaListener(autoStartup = "${kafka-startup}", groupId = "${kafka-group}", topics = "${queue}",
        containerFactory = "kafkaListenerContainerFactory", concurrency = "${concurrency}")
public void listen(@Payload List<byte[]> messages,
                   @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> keys,
                   @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                   @Header(KafkaHeaders.RECEIVED_TIMESTAMP) List<Long> timestamps,
                   @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                   Acknowledgment ack) throws Exception {

    int indexQueue = new Random().nextInt(queues.size());

    for (int i = 0; i < messages.size(); i++) {
        //Do somethings
        ack.acknowledge();
    }
}

This solution, for my problem don't work because ack.acknowledge() commit of batch. For my solution I need commit of offset of single message.

I had try to use KafkaConsumer<String, byte[]> consumer with consumer.commitAsync(), but the situation is the same. For testing it, the script read a batch (composed from 3 mex): at thirth messagge the script launch an exception.

For example: message 1 -> offset 10; message 2 -> offset 11, message 3 -> offset 12

Script is reading:

  • message 1 (offset 10) -> ok
  • message 2 (offset 11) -> ok
  • message 3 (offset 12) -> Exception

At the next loop, script re-read message 1 with offset 10, but I would have expected message 3 with offset 12.

You have some ideas? Can you help me please ?

Thank you


Solution

  • The Acknowledgment for a batch listener should only be called once.

    You can now (since 2.3) call acknowledgment.nack(thisOneFailed, sleep);

    See https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-offsets

    Starting with version 2.3, the Acknowledgment interface has two additional methods nack(long sleep) and nack(int index, long sleep). The first one is used with a record listener, the second with a batch listener. Calling the wrong method for your listener type will throw an IllegalStateException.