Search code examples
apache-flinkgoogle-cloud-dataflowapache-beamamazon-kinesis

Apache beam: TTL in State Spec


We are reading from Kinesis and writing to parquet and we use StateSpec<ValueState<Boolean>> to avoid duplicated processing of records after gracefully stopping and relaunching our pipeline from the last savepoint.

We saw that some records were duplicated because they end up falling on a different task manager on subsequent relaunches, and we use StateSpec<ValueState<Boolean>> to store stateful information about the processed records and avoid duplicates.

We are dealing with how to clear the state every certain time without the risk of losing the most recent processed records if they are needed in an upcoming stop. (i.e, we need a something like a TTL on that class).

We thought about a timer that clears the state every certain time but that doesn't meet our requirements because we need to keep the most recent processed records.

We read here that using event time processing automatically clears State information after a window expires and we would like to know if that fits with our requirement using the StateSpec class.

Otherwise, is there a class to store state that has a kind of TTL to implement this feature?

What we have right now is this piece of code that checks if the element has already processed and a method that clears the state every certain time

    @StateId("keyPreserved")
    private final StateSpec<ValueState<Boolean>> keyPreserved = StateSpecs.value(BooleanCoder.of());
    @TimerId("resetStateTimer")
    private final TimerSpec resetStateTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

    public void processElement(ProcessContext context,
        @TimerId("resetStateTimer") Timer resetStateTimer,
        @StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
        if (!firstNonNull(keyPreservedState.read(), false)) {
            T message = context.element().getValue();

            //Process element here

            keyPreservedState.write(true);
        }
    }


    @OnTimer("resetStateTimer")
    public void onResetStateTimer(OnTimerContext context,
        @StateId("keyPreserved") ValueState<Boolean> keyPreservedState) {
        keyPreservedState.clear();
    }

Solution

  • Setting the timer every time we call keyPreservedState.write(true); was enough. When the timer expires keyPreservedState.clear(); only clears the element in the contexts, not the whole state.