Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

This query does not support recovering from checkpoint location. Delete checkpoint/testmemeory/offsets to start over


I have created In-Memory tables inside Spark and try to restart the Spark structured streaming job after failure. It gets "This query does not support recovering from checkpoint location. Delete checkpoint/TEST_IN_MEMORY/offsets to start over."

What is the Concept of Checkpoint In-Memory sink? Is there any way to rectify it? (can we delete the old and the new checkpoint dynamically?)

I am using Data Stax 5.1.6 clusters, so I don't have Choice I have to go with Spark 2.0.2 version only.

val kafkaDataFrame_inmemory = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "Localhost:9092")
  .option("subscribe", "TEST_IN_MOEMORY")
  .option("startingOffsets", "earliest")
  .load()


val checkpoint =  "C/Users/756661/Desktop/KT DOCS/spark/In_MEM_TABLE"+ UUID.randomUUID.toString

kafkaDataFrame_inmemory
  .writeStream
  .format("memory")
  .option("truncate", false)
  .queryName("IN_MEM_TABLE")
  .outputMode("update")
  .option("checkpointLocation",checkpoint)
  .start()

Solution

  • You should simply remove the line .option("checkpointLocation",checkpoint) from your code and start over.

    Per the error message, the memory data source does not support recovering from checkpoint location. Any attempt at (re)starting a streaming query that uses memory format with a directory that exists already will fail.

    org.apache.spark.sql.AnalysisException: This query does not support recovering from checkpoint location. Delete xxx/offsets to start over.;
      at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:240)
      at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:326)
      at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:267)
      ... 49 elided
    

    It's not to say that memory data source won't use a checkpoint directory. It will, but it is going to be of the name randomly generated.


    can we delete the old and the new checkpoint dynamically?

    Sure and that's the only way to start the streaming query.