Search code examples
apache-sparkapache-kafkaspark-structured-streaming

What happens when you restart a spark job if it encounters unexpected format in the data fed to kafka


I have a question regarding Spark Structured Streaming with Kafka. Suppose that I am running a spark job and every thing is working perfectly. One fine day, my spark job fails because of inconsistencies in data that is fed to kafka. Inconsistencies may be anything like data format issues or junk characters which spark couldn't have processed. In such case, how do we fix the issue? Is there a way we can get into the kafka topic and make changes to the data manually?

If we don't fix the data issue and restart the spark job, it will read the same old row which contributed to failure since we have not yet committed the checkpoint. so how do we get out of this loop. How to fix the data issue in Kafka topic for resuming the aborted spark job?


Solution

  • I would avoid trying to manually change one single message within a Kafka topic unless you really know what you are doing.

    To prevent this from happening in the future, you might want to consider using a schema for your data (in combination with a schema registry).

    For mitigating the problem you described I see the following options:

    • Manually change the offset of the Consumer Group of your structured streaming application
    • create a "new" streaming job that starts reading from a particular offset

    Manually change offset

    When using Sparks structured streaming the consumer group is automatically set by Spark. According to the code the Consumer Group will be defined as:

    val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    

    You can change the offset by using the kafka-consumer-groups tool. First identify the actual name of the consumer group by

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
    

    and then set the offset for that consumer group for a particular topic (e.g. offset 100)

    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100
    

    If you need to change the offset only for a particular partition you can have a look at the help function of the tool on how to do this.

    Create new Streaming Job

    You could make use of the Spark option startingOffsets as describe in the Spark + Kafka integration guide:

    Option: startingOffsets

    value: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """

    default: "latest" for streaming, "earliest" for batch

    meaning: The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.

    For this to work, it is important to have a "new" query. That means you need to delete your checkpoint files of your existing job or create complete new application.