Im using autoloader to incrementally load the data into silver layer from bronze tables where I use changeFeed feature.
This is how I read the df
df = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.table("mdp_prd.bronze.nrq_customerassetproperty_autoloader_nodups")
I also use the def
to pass this function inside ForEachBatch
when writing the stream.
part of function look like this.
from pyspark.sql.functions import col
def update_changefeed(df, epochId):
filtered_df = df.filter(col("_change_type").isin("insert", "update_postimage", "delete"))
filtered_df.createOrReplaceTempView("test2")
Running the writing part of the code failes:
df.writeStream.foreachBatch(update_changefeed) \
.option("checkpointLocation", checkpoint_directory) \
.trigger(availableNow=True).start().awaitTermination()
error telling me that The table or view "test2" cannot be found
.
I tried to test the function and create the tmp view
out of the streaming df
and it worked. I think the problem is when the function is passed to ForEachBatch
it does not create it there. Is there any fix or solution to this?
Yes @Greencolor. As you said we need to use global view.
from pyspark.sql.functions import col
def update_changefeed(df,id):
ndf1 = df.filter(col("fare_amount")>10)
ndf2 = df.filter(col("fare_amount")<10)
ndf1.createGlobalTempView("test1")
ndf2.createGlobalTempView("test2")
spark.sql("show views from global_temp").show()
output for above function.
results.
This is because from documentation, TEMPORARY views are visible only to the session that created them and are dropped when the session ends. As mentioned in the follow up solution thread, that is the dataframe created from the session cloned from original. So, it is valid to that specific session.
You see below it printing view with dataframe session and not for original spark.
Code:
from pyspark.sql.functions import col
def update_changefeed(df,id):
ndf1 = df.filter(col("fare_amount")>10)
ndf1.createOrReplaceTempView("test1")
temp_views = df.sparkSession.catalog.listTables()
print(f"Views in this batch:{temp_views}")
spark.sql("show views from default").show()