I am trying to verify the working of manual offset commit.
When I try to exit the job either by using thread.sleep()/jssc.stop()/ throwing exceptions in the while loop, I see offsets are being committed.
I am just sending couple of message in order to test, but I see 0 lag as soon as the job starts processing the batch.
When does spark actually commit the offsets ?
JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
kafkaStream.foreachRDD(kafkaStreamRDD -> {
// fetch kafka offsets for manually committing it later
OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();
// filter unwanted data
kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {
//filter logic here
}).foreachPartition(kafkaRecords -> {
//Initializing DB connections
while (kafkaRecords.hasNext()) {
//doing some work here
//-----> EXCEPTION
throw new Exception();
}
});
// commit offsets saveOffsets after processing
((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
if (exception != null) {
System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
exception.printStackTrace(); // need this for driver
} else {
System.out.println("Successfully committed offsets"); // need this for driver
for (OffsetRange offsetRange : offsetRanges) {
System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
}
}
});
enable.auto.commit
: false
Observe the throw new Exception();
in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ?
In case of exceptions on worker node the task is resubmitted for a max value of spark.task.maxFailures
(Number of failures of any particular task before giving up on the job). Offsets are committed once the Dstream batch is processed. You have to handle the exception (logging error record or forwarding record to DLQ) based on you use case.