The start time of my 5 minute tumbling window seems incorrect. The start time of the window and the current time are always 1 minute appart, but should be at least 5 minutes apart.
I am performing aggregation on a 5 minute tumbling window. I store the aggregations into DynamoDB. In the DynamoDB record, I store the window timestamp and the current time. The window timestamp is always the last minute of the 5 minute window and the current time is always at the end of the window. The current time seems correct, but the window timestamp should be 5 minutes before the end of the window. The data that is being stored is correct, so I think I must be getting the window start time incorrectly.
I have the aggregation stream defined like:
DataStream<> stream = inputStream
.keyBy(new KeySelector())
.window(TumblingProcessingTimeWindows.of(Time.minutes(5))
.aggregate(new Aggregator());
My Element Converter looks like:
public class DynamoDbAggregationConverter implements ElementConverter<Aggregation, DynamoDbWriteRequest> {
@Override
public DynamoDbWriteRequest apply(Aggregation agg, SinkWriter.Context context) {
long startWindowTimestamp = context.timestamp();
long createTime = Instant.now().getEpochSecond();
.....
}
I've discovered The SinkWriter.Context does not have the window boundaries (I'm not sure what it is used for).
A ProcessWindowFunction is needed to get the window boundaries like described in the documentation.