Search code examples
hivedatabricksspark-structured-streamingdelta-lake

Spark streaming writing as delta and checkpoint location


I am trying to stream from a delta table as a source and then also writing as delta after performing some transformations. so, this all worked. I recently looked at some videos and posts about best practices and found that I needed to do an additional thing and a modification.

  1. The addition was adding queryName
  2. Changing the checkpoint location, so that it resides alongside the data and not in a separate directory , like I was doing.

So, I have one question and a problem

Question is- can I add the queryName now, after my stream has been running for sometime , without any consequences?

and the problem, is: Now, that I have put my checkpoint location as the same directory as my delta table would be , I can't seem to create an external hive table anymore , it seems. It fails with

pyspark.sql.utils.AnalysisException: Cannot create table ('`spark_catalog`.`schemaname`.`tablename`'). The associated location ('abfss://[email protected]/curated/schemaname/tablename') is not empty but it's not a Delta table

So, this was my original code, which worked

def upsert(microbatchdf, batchId):
   .....some transformations on microbatchdf
   ..........................
   ..........................
    # Create Delta table beforehand as otherwise generated columns can't be created
    # after having written the data into the data lake with the usual partionBy
    deltaTable = (
        DeltaTable.createIfNotExists(spark)
        .tableName(f"{target_schema_name}.{target_table_name}")
        .addColumns(microbatchdf_deduplicated.schema)
        .addColumn(
            "trade_date_year",
            "INT",
            generatedAlwaysAs="Year(trade_date) ",
        )
        .addColumn(
            "trade_date_month",
            "INT",
            generatedAlwaysAs="MONTH(trade_date)",
        )
        .addColumn("trade_date_day", "INT", generatedAlwaysAs="DAY(trade_date)")
        .partitionedBy("trade_date_year", "trade_date_month", "trade_date_day")
        .location(
            f"abfss://{target_table_location_filesystem}@{datalakename}.dfs.core.windows.net/{target_table_location_directory}"
        )
        .execute()
    )
   .....some transformations and writing to the delta table
 #end

#this is how the stream is run
streamjob = (
    spark.readStream.format("delta")
    .table(f"{source_schema_name}.{source_table_name}")
    .writeStream.format("delta")
    .outputMode("append")
    .foreachBatch(upsert)
    .trigger(availableNow=True)
    .option(
        "checkpointLocation",
        f"abfss://{target_table_location_filesystem}@{datalakename}.dfs.core.windows.net/curated/checkpoints/",
    )
    .start()
)

streamjob.awaitTermination()

Now, to this working piece , I only tried adding the queryName and modifying the checkpoint location (see comment for the modification and addition)

streamjob = (
    spark.readStream.format("delta")
    .table(f"{source_schema_name}.{source_table_name}")
    .writeStream.format("delta")
    .queryName(f"{source_schema_name}.{source_table_name}") # this added
    .outputMode("append")
    .foreachBatch(upsert)
    .trigger(availableNow=True)
    .option(
        "checkpointLocation",
        f"abfss://{target_table_location_filesystem}@{datalakename}.dfs.core.windows.net/{target_table_location_directory}/_checkpoint", # this changed
    )
    .start()
)

streamjob.awaitTermination()

In my datalake the _checkpoint did get created and apparently for this folder, the external table creation complains of non empty folder, whereas the documentation here, mentions that

enter image description here

So, why is the external hive table creation fails then? Also, please note my question about the queryName addition to an already running stream.

Point to note is- I have tried dropping the external table and also removed the contents of that directory, so there is nothing in that directory except the _checkpoint folder Which got created when I ran the streaming job , just before it got to creating the table inside the upsert method.

Any questions and I can help clarify.


Solution

  • The problem is that checkpoint files are put before you call the ``DeltaTable.createIfNotExists` function that checks if you have any data in that location or not, and fails because additional files are there, but they don't belong to the Delta Lake table.

    If you want to keep checkpoint with your data, you need to put DeltaTable.createIfNotExists(spark)... outside of the upsert function - in this case, table will be created before any checkpoint files are created.