I have multiple DataFramess and finally, I am writing those DF in delta tables.
There are 5 DataFrames I need to write into 5 delta tables in parallel. Can we do this in one notebook?
I am writing output like this:
query_a_b = metadata_df1.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df1")
query_a_c = state_df.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint1/event_hub_df") \
.outputMode("append") \
.start("Tables/state_df")
query_a_d = cols.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint2/event_hub_df") \
.outputMode("append") \
.start("Tables/cols")
query_a_e = metadata_df2.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint3/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df2")
query_a_f = metadata_df3.writeStream \
.format("delta") \
.option("checkpointLocation", "Files/checkpoint4/event_hub_df") \
.outputMode("append") \
.start("Tables/metadata_df3")
Likewise, I have 30 DataFrames that should be written in parallel with 30 delta tables.
Will this be written in parallel?
You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources.
You can use sparkSession.streams()
to get the StreamingQueryManager (Scala/Java/Python docs) that can be used to manage the currently active queries.
spark = ... # spark session
spark.streams.active # get the list of currently active streaming queries
spark.streams.get(id) # get a query object by its unique id
spark.streams.awaitAnyTermination() # block until any one of them terminates
You can use a similar approach if you have 30 DataFrames that you need to write to 30 Delta tables in parallel. Just create 30 separate writeStream
queries, one for each DataFrame.