Search code examples
pysparkspark-streamingdatabricksazure-databricks

Streaming data to delta table with saving the latest values


I'm streaming some temperature data to Databricks from an Azure Event Hub, and looking to store the latest values in a delta table. For the temperature values for each sensor, I'm taking the max value of the last five minutes. I seem to be hitting a block with the 'upsert' to the delta table. Data is coming every 10-15 seconds for each device. I'm not sure if I'm using the writeStream correctly or may have to use a window function over the data frame to insert the latest aggerated value.

So far I have created a basic example in pysprak to see if it can be done

#This sets up the data frame    
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")

# rounds up the time into 5 minutes
df = df.select(
  get_json_object(df.body,'$.sensorId').alias('sensorId'), 
  get_json_object(df.body,'$.count').alias('temp'), 
  to_timestamp(from_unixtime(round(((get_json_object(df.body,'$.timestamp')/1000)/300))*300.0 ,"yyyy-MM-dd HH:mm:ss")).alias("roundedDatetime")
)

# Groups by the sensor id and round date
df = df.groupBy("sensorId", "roundedDatetime").agg(max("temp").cast("int").alias("temp"))

That works all fine an I can see the data in the 5 minute aggregation level

# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="5 minutes").foreachBatch(upsertToDelta).outputMode("update").start())


# this is my basic batch function, taken from the example docs on streaming

def upsertToDelta(microbatchdf, batchId):
  microbatchdf.createOrReplaceTempView("updates")
  
  microbatchdf._jdf.sparkSession().sql("""
    MERGE INTO latestSensorReadings t
    USING updates s
    ON s.sensorId = t.sensorId
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
  """)

So when first run into a empty delta table, its fine, after five minutes there is a merge conflict, as its trying to insert the same value. Is it trying to upsert the whole dataframe and not the latest items??

I've had a look at sliding windows doing a group by with the event time, but that didn't seem to seem to work. I'm thinking of adding a windowing function in the micro batch function so it will only insert that latest value if there are more than one item, for example, one at 10:00am and 10:05am in the rounded values, it will take the 10:05 one. Recommendations? I think I'm not quite getting the trigger right maybe? I've tried reducing it down and up by a minute, but no joy.


Solution

  • So after looking a around, and doing a deeper dive into streaming, it looks like I was doing it the wrong way round. For it to do what I want, I need to remove the grouping before processing the batch.

    #This sets up the data frame    
    df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
    
    # get the details from the event hub binary
    df = df.select(
      get_json_object(df.body,'$.sensorId').alias('sensorId'), 
      get_json_object(df.body,'$.count').alias('temp'))
    

    So I just get the granular details, then every 5 minutes, process that batch. So my batch function looks like:

    # Should insert trigger the batch every five minutes
    query = (df.writeStream.format("delta").trigger(processingTime="1 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
    
    
    # this is my updated batch function, now doing the grouping
    
    def upsertToDelta(microbatchdf, batchId):
    
      microbatchdf = microbatchdf.groupBy("sensorId").agg(max("temp").cast("int").alias("temp"))
      microbatchdf = microbatchdf.withColumn("latestDatetime", current_timestamp())
    
      microbatchdf.createOrReplaceTempView("updates")
      
      microbatchdf._jdf.sparkSession().sql("""
        MERGE INTO latestSensorReadings t
        USING updates s
        ON s.sensorId = t.sensorId
        WHEN MATCHED THEN UPDATE SET *
        WHEN NOT MATCHED THEN INSERT *
      """)