Search code examples
apache-sparkapache-kafkaspark-streamingdstream

Unable to manually commit offset in kafka direct stream, Spark streaming


I am trying to verify the working of manual offset commit.

When I try to exit the job either by using thread.sleep()/jssc.stop()/ throwing exceptions in the while loop, I see offsets are being committed.

I am just sending couple of message in order to test, but I see 0 lag as soon as the job starts processing the batch.

When does spark actually commit the offsets ?

JavaInputDStream<ConsumerRecord<String, String>> kafkaStream = KafkaUtils.createDirectStream(jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

    kafkaStream.foreachRDD(kafkaStreamRDD -> {
                // fetch kafka offsets for manually committing it later
                OffsetRange[] offsetRanges = ((HasOffsetRanges) kafkaStreamRDD.rdd()).offsetRanges();


                // filter unwanted data
                kafkaStreamRDD.filter(new Function<ConsumerRecord<String, String>, Boolean>() {

                    //filter logic here

                }).foreachPartition(kafkaRecords -> {

                    //Initializing DB connections

                    while (kafkaRecords.hasNext()) {

                        //doing some work here

                        //-----> EXCEPTION
                        throw new Exception();
                    }

                });
                // commit offsets saveOffsets after processing
            ((CanCommitOffsets) kafkaStream.inputDStream()).commitAsync(offsetRanges, (offsets, exception) -> {
                if (exception != null) {
                    System.out.println("-------------Unable to commit offsets, something went wrong, trace ------------"+ exception.getCause());
                    exception.printStackTrace(); // need this for driver
                } else {
                    System.out.println("Successfully committed offsets"); // need this for driver
                    for (OffsetRange offsetRange : offsetRanges) {
                        System.out.println("Offset Info: paratition {}, fromOffset {} untilOffset {}: "+ offsetRange.partition() +":"+ offsetRange.fromOffset() +":"+ offsetRange.untilOffset());
                    }
                }
            });

enable.auto.commit : false

Observe the throw new Exception(); in while loop. Even if the batch fails because of the exception, i see the offset committed, I am expecting some lag here as the processing failed, what is wrong here ?


Solution

  • In case of exceptions on worker node the task is resubmitted for a max value of spark.task.maxFailures (Number of failures of any particular task before giving up on the job). Offsets are committed once the Dstream batch is processed. You have to handle the exception (logging error record or forwarding record to DLQ) based on you use case.