Search code examples
apache-sparkapache-kafkaspark-streamingoffsetspark-structured-streaming

Structured Streaming Kafka Source Offset Storage


I am using the Structured Streaming source for Kafka (Integration guide), which as stated does not commit any offset.

One of my goals is to monitor it (check if its lagging behind etc). Even though it does not commit the offsets it handles them by querying kafka from time to time and checking which is the next one to process. According to the documentation the offsets are written to HDFS so in case of failure it can be recovered, but the question is:

Where are they being stored? Is there any way of monitoring the kafka consumption (from outside of the program; so a kafka cli or similar, the offset coming with each record does not suit the use case) of a spark streaing (structured) if it does not commit the offsets?

Cheers


Solution

  • Structured Streaming for kafka saves offsets to HDFS below structures.

    Example checkpointLocation setting is below.

    .writeStream.
    .....
      option("checkpointLocation", "/tmp/checkPoint")
    .....
    

    In that case, Structured Streaming for kafka saves below path

    /tmp/checkPoint/offsets/$'batchid'
    

    Saved file contains below format.

    v1
    {"batchWatermarkMs":0,"batchTimestampMs":$'timestamp',"conf":{"spark.sql.shuffle.partitions":"200"}}
    {"Topic1WithPartiton1":{"0":$'OffsetforTopic1ForPartition0'},"Topic2WithPartiton2":{"1":$'OffsetforTopic2ForPartition1',"0":$'OffsetforTopic2ForPartition1'}}
    

    For example.

    v1
    {"batchWatermarkMs":0,"batchTimestampMs":1505718000115,"conf":{"spark.sql.shuffle.partitions":"200"}}
    {"Topic1WithPartiton1":{"0":21482917},"Topic2WithPartiton2":{"1":103557997,"0":103547910}}
    

    So, I think for monitoring offset lag, it needs to develop custom tools what has below functions.

    • Read from offsets from HDFS.
    • Write offset to Kafka __offset topic.

    That way, already existing offset lag monitoring tool can monitor Structured Streaming for kafka's offset lag.