Search code examples
scalaapache-sparkapache-spark-sqlspark-streaming

Spark Structured streaming- Using different Windows for different GroupBy Keys


Currently i have following table after reading from a Kafka topic via spark structured streaming

key,timestamp,value  
-----------------------------------
key1,2017-11-14 07:50:00+0000,10    
key1,2017-11-14 07:50:10+0000,10  
key1,2017-11-14 07:51:00+0000,10    
key1,2017-11-14 07:51:10+0000,10    
key1,2017-11-14 07:52:00+0000,10    
key1,2017-11-14 07:52:10+0000,10  

key2,2017-11-14 07:50:00+0000,10  
key2,2017-11-14 07:51:00+0000,10  
key2,2017-11-14 07:52:10+0000,10  
key2,2017-11-14 07:53:00+0000,10  

I would like to use different windows for each of the keys and perform aggregation

for example
key1 would be aggregated on window of 1 minute to yield

key,window,sum
------------------------------------------
key1,[2017-11-14 07:50:00+0000,2017-11-14 07:51:00+0000],20  
key1,[2017-11-14 07:51:00+0000,2017-11-14 07:52:00+0000],20  
key1,[2017-11-14 07:52:00+0000,2017-11-14 07:53:00+0000],20  

key2 would be aggregated on window of 2 minutes to yield

key,window,sum
------------------------------------------
key2,[2017-11-14 07:50:00+0000,2017-11-14 07:52:00+0000],20  
key2,[2017-11-14 07:52:00+0000,2017-11-14 07:54:00+0000],20  

Currently i do the following

var l1 = List(List(key1,"60 seconds"),List(key2,"120 seconds"))  
l1.foreach{list => 

    val filtered_df = df.filter($"key" === list(0))

    val windowedPlantSum = filtered_df
        .withWatermark("timestamp", "120 minutes")
        .groupBy(
          window($"timestamp", list(1)),
          $"key"
        )
        .agg(sum("value").alias("sum")

    //start the stream
}

The above approach starts 2 separate streams. In my case there are 200 such keys which starts 200 streams which fails due to memory issue.

Is there any way to specify window based on Keys in Spark structured streaming or is there any other approaches?


Solution

  • I guess you have to use mapGroupsWithState to only manage one query

    From slides 28 : https://www.slideshare.net/databricks/arbitrary-stateful-aggregations-using-structured-streaming-in-apache-spark

    And also: