Search code examples
apache-sparkapache-kafkaspark-structured-streamingspark-streaming-kafka

Does Spark Structured Streaming have some timeout issue when reading streams from a Kafka topic?


I implemented a spark job to read stream from a kafka topic with foreachbatch in the structured streaming.

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "mykafka.broker.io:6667")
  .option("subscribe", "test-topic")
  .option("kafka.security.protocol", "SASL_SSL")
  .option("kafka.ssl.truststore.location", "/home/hadoop/cacerts")
  .option("kafka.ssl.truststore.password", tspass)
  .option("kafka.ssl.truststore.type", "JKS")
  .option("kafka.sasl.kerberos.service.name", "kafka")
  .option("kafka.sasl.mechanism", "GSSAPI")
  .option("groupIdPrefix","MY_GROUP_ID")
  .load()

val streamservice = df.selectExpr("CAST(value AS STRING)")
  .select(from_json(col("value"), schema).as("data"))
  .select("data.*")


var stream_df = streamservice
  .selectExpr("cast(id as string) id", "cast(x as int) x")

val monitoring_stream = stream_df.writeStream
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    if(!batchDF.isEmpty) { }
  }
  .start()
  .awaitTermination()

I have the following questions.

  1. If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?

  2. If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?

I hope that the spark job keep on monitoring on the kafka topic without termination in the above two cases. Do I need some special settings for kafka connector and/or stream_df.writerstream?


Solution

    1. If kafka topic does not have data for a long time, will stream_df.writeStream be terminated automatically? Are there some timeout control on this?

    The termination of the query is independent of the data being processed. Even if no new messages are produced to your Kafka topic the query will keep running, as it is running as a stream.

    I guess that is what you have already figured out yourself while testing. We are using structured streaming queries to process data from Kafka and they have no issues being idle for a longer time (for example over the week-end outside of business hours).

    1. If kafka topic is deleted from kafka broker, will stream_df.writeStream be terminated?

    By default, if you delete the Kafka topic while your query is running an Exception is thrown:

    ERROR MicroBatchExecution: Query [id = b1f84242-d72b-4097-97c9-ee603badc484, runId = 752b0fe4-2762-4fff-8912-f4cffdbd7bdc] terminated with error
    java.lang.IllegalStateException: Partition test-0's offset was changed from 1 to 0, some data may have been missed. 
    Some data may have been lost because they are not available in Kafka any more; either the
     data was aged out by Kafka or the topic may have been deleted before all the data in the
     topic was processed. If you don't want your streaming query to fail on such cases, set the
     source option "failOnDataLoss" to "false".
    

    I mentioned "by default" because the query option failOnDataLoss default to true. As explained in the Exception message you could set this to false to let your streaming query running. This option is described in the Structured streaming + Kafka Integration Guide as:

    "Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected."