Search code examples
javaapache-flinkflink-streaming

Flink Time Window Start Time for 5 minute window


The start time of my 5 minute tumbling window seems incorrect. The start time of the window and the current time are always 1 minute appart, but should be at least 5 minutes apart.

I am performing aggregation on a 5 minute tumbling window. I store the aggregations into DynamoDB. In the DynamoDB record, I store the window timestamp and the current time. The window timestamp is always the last minute of the 5 minute window and the current time is always at the end of the window. The current time seems correct, but the window timestamp should be 5 minutes before the end of the window. The data that is being stored is correct, so I think I must be getting the window start time incorrectly.

I have the aggregation stream defined like:

 DataStream<> stream = inputStream
                        .keyBy(new KeySelector())
                        .window(TumblingProcessingTimeWindows.of(Time.minutes(5))
                        .aggregate(new Aggregator());

My Element Converter looks like:

public class DynamoDbAggregationConverter implements ElementConverter<Aggregation, DynamoDbWriteRequest> {

@Override
public DynamoDbWriteRequest apply(Aggregation agg, SinkWriter.Context context) {
    long startWindowTimestamp = context.timestamp();
    long createTime = Instant.now().getEpochSecond();
.....
}

Solution

  • I've discovered The SinkWriter.Context does not have the window boundaries (I'm not sure what it is used for).

    A ProcessWindowFunction is needed to get the window boundaries like described in the documentation.