(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)
What could be the expected reasons?
WindowFn Code is Simple:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
The default behavior of GroupByKey
is to output iterables with a timestamp that is the maximum timestamp allowed in the window. For your window, that is the timestamp 13:53:08.718Z
.
The element has the timestamp 13:53:08.725Z
which does not fall in the window from 13:53:08.088Z
to 13:53:08.719Z
.
Can you share your WindowFn
as well as any ParDo
you have that adjusts the timestamps?
UPDATE: Thanks for sharing your WindowFn
. There are a couple things about it that are going to cause you problems.
1. The start time of the assigned window is not based on the element's timestamp.
You extract a column of the element and assign the window according to the value of context.element().get(BQConstants.LOG_TIME)
(ignoring casts and parsing). From your error message, it seems that this is not the actual value of context.timestamp()
, which is the event time timestamp of the element.
Instead, you should write your WindowFn
to use context.timestamp()
. You can make sure the timestamp is what you want in different ways based on whether your data is bounded:
WithTimestamps
to assign timestamps by extracting that field.PubsubIO
reads timestamps from an attribute you can specify.2. The end time of the assigned window is based on the system date
A couple issues:
What is the goal here? Are you setting this up just to extract the endpoint for dynamic destinations? If so, I would suggest partitioning your data on when it happened, rather than on when it was processed.