I'm streaming some temperature data to Databricks from an Azure Event Hub, and looking to store the latest values in a delta table. For the temperature values for each sensor, I'm taking the max value of the last five minutes. I seem to be hitting a block with the 'upsert' to the delta table. Data is coming every 10-15 seconds for each device. I'm not sure if I'm using the writeStream correctly or may have to use a window function over the data frame to insert the latest aggerated value.
So far I have created a basic example in pysprak to see if it can be done
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# rounds up the time into 5 minutes
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'),
to_timestamp(from_unixtime(round(((get_json_object(df.body,'$.timestamp')/1000)/300))*300.0 ,"yyyy-MM-dd HH:mm:ss")).alias("roundedDatetime")
)
# Groups by the sensor id and round date
df = df.groupBy("sensorId", "roundedDatetime").agg(max("temp").cast("int").alias("temp"))
That works all fine an I can see the data in the 5 minute aggregation level
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="5 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my basic batch function, taken from the example docs on streaming
def upsertToDelta(microbatchdf, batchId):
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
So when first run into a empty delta table, its fine, after five minutes there is a merge conflict, as its trying to insert the same value. Is it trying to upsert the whole dataframe and not the latest items??
I've had a look at sliding windows doing a group by with the event time, but that didn't seem to seem to work. I'm thinking of adding a windowing function in the micro batch function so it will only insert that latest value if there are more than one item, for example, one at 10:00am and 10:05am in the rounded values, it will take the 10:05 one. Recommendations? I think I'm not quite getting the trigger right maybe? I've tried reducing it down and up by a minute, but no joy.
So after looking a around, and doing a deeper dive into streaming, it looks like I was doing it the wrong way round. For it to do what I want, I need to remove the grouping before processing the batch.
#This sets up the data frame
df = spark.readStream.format("eventhubs").options(**ehConf).load().selectExpr("cast (body as string) as body")
# get the details from the event hub binary
df = df.select(
get_json_object(df.body,'$.sensorId').alias('sensorId'),
get_json_object(df.body,'$.count').alias('temp'))
So I just get the granular details, then every 5 minutes, process that batch. So my batch function looks like:
# Should insert trigger the batch every five minutes
query = (df.writeStream.format("delta").trigger(processingTime="1 minutes").foreachBatch(upsertToDelta).outputMode("update").start())
# this is my updated batch function, now doing the grouping
def upsertToDelta(microbatchdf, batchId):
microbatchdf = microbatchdf.groupBy("sensorId").agg(max("temp").cast("int").alias("temp"))
microbatchdf = microbatchdf.withColumn("latestDatetime", current_timestamp())
microbatchdf.createOrReplaceTempView("updates")
microbatchdf._jdf.sparkSession().sql("""
MERGE INTO latestSensorReadings t
USING updates s
ON s.sensorId = t.sensorId
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")