Search code examples
javagoogle-cloud-platformgoogle-cloud-dataflowillegalstateexception

Sometime getting IllegalStateException while running pipeline in dataflow runner


(5d8e3f411b5a4ccb): java.lang.IllegalStateException: TimestampCombiner moved element from 2017-09-25T13:53:08.725Z to earlier time 2017-09-25T13:53:08.718Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)

What could be the expected reasons?

WindowFn Code is Simple:

public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {

/**
 * 
 */
private static final long serialVersionUID = 1L;

private IntervalWindow assignWindow(AssignContext context) {
    TableRow tableRow = (TableRow) context.element();
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
    String currentTime = DateUtil.getFormatedDate(new Date());
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
            .withZoneUTC();
    Instant start_point = Instant.parse(timestamp, formatter);
    Instant end_point = Instant.parse(currentTime, formatter);

    return new IntervalWindow(start_point, end_point);
};

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}

@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
    return Arrays.asList(assignWindow(c));
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return false;
}

@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
    throw new IllegalArgumentException(
            "Attempted to get side input window for GlobalWindow from non-global WindowFn");
}

}


Solution

  • The default behavior of GroupByKey is to output iterables with a timestamp that is the maximum timestamp allowed in the window. For your window, that is the timestamp 13:53:08.718Z.

    The element has the timestamp 13:53:08.725Z which does not fall in the window from 13:53:08.088Z to 13:53:08.719Z.

    Can you share your WindowFn as well as any ParDo you have that adjusts the timestamps?

    UPDATE: Thanks for sharing your WindowFn. There are a couple things about it that are going to cause you problems.

    1. The start time of the assigned window is not based on the element's timestamp.

    You extract a column of the element and assign the window according to the value of context.element().get(BQConstants.LOG_TIME) (ignoring casts and parsing). From your error message, it seems that this is not the actual value of context.timestamp(), which is the event time timestamp of the element.

    Instead, you should write your WindowFn to use context.timestamp(). You can make sure the timestamp is what you want in different ways based on whether your data is bounded:

    • If your data is bounded, you can use WithTimestamps to assign timestamps by extracting that field.
    • If your data is unbounded, the source needs to know more so it can manage the watermark, so the configuration depends on the source. As an example, PubsubIO reads timestamps from an attribute you can specify.

    2. The end time of the assigned window is based on the system date

    A couple issues:

    • The end time is rounded down and may precede the start time, resulting in an invalid window.
    • The end time is nondeterministic. The general expectation in Beam is that you'll deterministically assign a window based primarily on the element's timestamp (which must fall before the end of the window) and secondarily on the element itself. Assigning a nondeterministic window such as this probably has unforeseen drawbacks. One known issue is that your results are not reproducible, which could be trouble if you need to fix a data processing bug or run experiments on archived data. It depends on your use-case, but you might consider something more future-proof.

    What is the goal here? Are you setting this up just to extract the endpoint for dynamic destinations? If so, I would suggest partitioning your data on when it happened, rather than on when it was processed.