I have data which starts from 1st Jan 2017 to 7th Jan 2017 and it is a week wanted weekly aggregate. I used window function in following manner
val df_v_3 = df_v_2.groupBy(window(col("DateTime"), "7 day"))
.agg(sum("Value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
I am having data in dataframe as
DateTime,value
2017-01-01T00:00:00.000+05:30,1.2
2017-01-01T00:15:00.000+05:30,1.30
--
2017-01-07T23:30:00.000+05:30,1.43
2017-01-07T23:45:00.000+05:30,1.4
I am getting output as :
2016-12-29T05:30:00.000+05:30,2017-01-05T05:30:00.000+05:30,723.87
2017-01-05T05:30:00.000+05:30,2017-01-12T05:30:00.000+05:30,616.74
It shows that my day is starting from 29th Dec 2016 but in actual data is starting from 1 Jan 2017,why this margin is occuring?
For tumbling windows like this it is possible to set an offset to the starting time, more information can be found in the blog here. A sliding window is used, however, by setting both "window duration" and "sliding duration" to the same value, it will be the same as a tumbling window with starting offset.
The syntax is like follows,
window(column, window duration, sliding duration, starting offset)
With your values I found that an offset of 64 hours would give a starting time of 2017-01-01 00:00:00
.
val data = Seq(("2017-01-01 00:00:00",1.0),
("2017-01-01 00:15:00",2.0),
("2017-01-08 23:30:00",1.43))
val df = data.toDF("DateTime","value")
.withColumn("DateTime", to_timestamp($"DateTime", "yyyy-MM-dd HH:mm:ss"))
val df2 = df
.groupBy(window(col("DateTime"), "1 week", "1 week", "64 hours"))
.agg(sum("value") as "aggregate_sum")
.select("window.start", "window.end", "aggregate_sum")
Will give this resulting dataframe:
+-------------------+-------------------+-------------+
| start| end|aggregate_sum|
+-------------------+-------------------+-------------+
|2017-01-01 00:00:00|2017-01-08 00:00:00| 3.0|
|2017-01-08 00:00:00|2017-01-15 00:00:00| 1.43|
+-------------------+-------------------+-------------+