Search code examples
pythonpysparkazure-databricksspark-structured-streamingdatabricks-autoloader

Autoloader - creating tmp view


Im using autoloader to incrementally load the data into silver layer from bronze tables where I use changeFeed feature.

This is how I read the df

df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")

I also use the def to pass this function inside ForEachBatch when writing the stream.

part of function look like this.

from pyspark.sql.functions import col
def update_changefeed(df, epochId):
    filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
    filtered_df.createOrReplaceTempView("test2")

Running the writing part of the code failes:

df.writeStream.foreachBatch(update_changefeed) \
 .option("checkpointLocation", checkpoint_directory) \
 .trigger(availableNow=True).start().awaitTermination()

error telling me that The table or view "test2" cannot be found.

I tried to test the function and create the tmp view out of the streaming df and it worked. I think the problem is when the function is passed to ForEachBatch it does not create it there. Is there any fix or solution to this?


Solution

  • Yes @Greencolor. As you said we need to use global view.

    from pyspark.sql.functions import col
    
    def update_changefeed(df,id):
        ndf1 = df.filter(col("fare_amount")>10)
        ndf2 = df.filter(col("fare_amount")<10)
        ndf1.createGlobalTempView("test1")
        ndf2.createGlobalTempView("test2")
        spark.sql("show  views  from global_temp").show()
    

    output for above function.

    enter image description here

    results.

    enter image description here

    This is because from documentation, TEMPORARY views are visible only to the session that created them and are dropped when the session ends. As mentioned in the follow up solution thread, that is the dataframe created from the session cloned from original. So, it is valid to that specific session.

    You see below it printing view with dataframe session and not for original spark.

    Code:

    from pyspark.sql.functions import col
    
    def update_changefeed(df,id):
        ndf1 = df.filter(col("fare_amount")>10)
        ndf1.createOrReplaceTempView("test1")
        temp_views = df.sparkSession.catalog.listTables()
        print(f"Views in this batch:{temp_views}")
        spark.sql("show  views  from  default").show()
    

    enter image description here