Search code examples
databricksazure-eventhubazure-databricks

Azure Event Hubs to Databricks, what happens to the dataframes in use


I've been developing a proof of concept on Azure Event Hubs Streaming json data to an Azure Databricks Notebook, using Pyspark. In the examples I've seen, I've created my rough code as follows, taking the data from the event hub to the delta table I'll be using as a destination

connectionString = "My End Point"
ehConf = {'eventhubs.connectionString' : connectionString}

df = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .load()

readEventStream = df.withColumn("body", \
 df["body"].cast("string")). \
 withColumn("date_only", to_date(col("enqueuedTime")))

readEventStream.writeStream.format("delta") \
 .outputMode("append") \
 .option("checkpointLocation", "/delta/testSink/streamprocess") \
 .table("testSink") 

After reading around googling, what happens to the df & readEventStream dataframes? Will they just get bigger as they retain the data or will they empty during the normal process? Or is it just a temporary store before dumping the data to the Delta table? Is there a way of setting X amount of items streamed before writing out to the Delta table?

Thanks


Solution

  • I carefully reviewed the description of the APIs you used in the code from the PySpark offical document of pyspark.sql module, I think the memory usage of bigger and bigger was caused by the function table(tableName) as the figure below which is for a DataFrame, not for a streaming DataFrame.

    enter image description here

    So table function create the data strcuture to fill the streaming data in memory.

    I recommanded you need to use start(path=None, format=None, outputMode=None, partitionBy=None, queryName=None, **options) to complete the stream write operation first, then to get a table from delta lake again. And there seems not to be a way for setting X amount of items streamed using PySpark before writing out to the Delta table.