I'm trying to implement a Spring Batch job that is scheduled by a Spring scheduler.
The current befavior is as follows:
The expected behavior is that the job needs to start processing from the last commited offset.
I have set an empty map to the partitionOffsets property as follows to make the reader read the offsets from kafka.
kafkaItemReader.setPartitionOffsets(new HashMap<>());
This only works when i restart the scheduler process. FYI, the offsets are correcty stored in the _consumer_offsets topic. My question is, i'm i mising some configuration that makes the KafkaItemReader read the offsets from kafka when the scheduler is not restarted ?
EDIT:
Each time the scheduler is calling jobLauncher.run(job, jobParameters);
, so i guess it is running a new instance
The item reader created is:
public ItemReader<byte[]> readFile() {
KafkaItemReader<String, byte[]> kafkaItemReader = new KafkaItemReaderBuilder<String, byte[]>()
.partitions(partitionsList)
.consumerProperties(props)
.name("consumer name")
.saveState(true)
.topic(topicName)
.build();
kafkaItemReader.setPartitionOffsets(new HashMap<>());
return kafkaItemReader;
}
My kafkaConsumer configurations are:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONFIG_BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, KAFKA_CONFIG_GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KAFKA_CONFIG_KEY_DESERIALIZER_CLASS);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KAFKA_CONFIG_VALUE_DESERIALIZER_BYTES_CLASS);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, KAFKA_CONFIG_FETCH_BYTES);
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, KAFKA_CONFIG_FETCH_BYTES);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KAFKA_CONFIG_AUTO_OFFSET_RESET_LATEST);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, KAFKA_CONFIG_ENABLE_AUTO_COMMMIT);
This only works when i restart the scheduler process.
This is probably because your item reader is a singleton bean, having the same state for the entire lifetime of the application context. Things should work as you expect if you make your item reader step-scoped, so that a new instance is created for each scheduled run, and therefore the partition offset would be set to the one stored in Kafka.