Search code examples
apache-kafkaspark-streaming

Pause and resume KafkaConsumer in SparkStreaming


:)

I've ended myself in a (strange) situation where, briefly, I don't want to consume any new record from Kafka, so pause the sparkStreaming consumption (InputDStream[ConsumerRecord]) for all partitions in the topic, do some operations and finally, resume consuming records.

First of all... is this possible?

I've been trying sth like this:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

but I got this:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

Any help to understand what I'm missing and why I'm getting empty results when it's clear the consumer has partitions assigned will be welcomed!

Versions: Kafka: 0.10 Spark: 2.3.0 Scala: 2.11.8


Solution

  • Yes it is possible Add check pointing in your code and pass persistent storage (local disk,S3,HDFS) path

    and whenever you start/resume your job it will pickup the Kafka Consumer group info with consumer offsets from the check pointing and start processing from where it was stopped.

    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    

    Spark Check-=pointing is mechanism not only for saving the offset but also save the serialize state of your DAG of your Stages and Jobs. So whenever you restart your job with new code it would

    1. Read and process the serialized data
    2. Clean the cached DAG stages if there are any code changes in your Spark App
    3. Resume processing from the new data with latest code.

    Now here reading from disk is just a one time operation required by Spark to load the Kafka Offset, DAG and the old incomplete processed data.

    Once it has done it will always keep on saving the data to disk on default or specified checkpoint interval.

    Spark streaming provides an option to specifying Kafka group id but Spark structured stream does not.