Search code examples
apache-flinkflink-streamingflink-cep

State Time-to-Live. How it work with Apache Flink CEP Pattern?



I read Apache Flink documentation about State Time-TO-Live https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
And I don't understand two moments.
1)
StateTTLConfig ttl = StateTtlConfig
.newBuilder(Time.minutes(60))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();

//And use in my Process Function 

valueStateDescriptor.enableTimeToLive(ttl);

If I will put in ValueState some element in 15:00 and then stop my job with savepoint and only in 17:00 i will start my job from last savepoint.
Value State will be clear, am i right?
2) If I use Apache Flink CEP Pattern:

.begin("a")
.where(simpleConditionA)
.followedBy("b")
.where(simpleConditionB)
.within(Time.minutes(60));

If I will get A element in 15:00 and then stop my job with savepoint and only in 17:00 i will start my job from last savepoint. And get B element, pattern not be match am i right?
How it (ttl) work with Apache Flink CEP Pattern?
Thanks.

I understand about CEP, I am really using ingestion time. I will try to explain: I use Process Function with ValueState, with timerTime, and I clear state in onTimer method. I put in state (keyedstate) some element, set timer on 1 hour and execute some logic. Basically value state + timer is used as an output limiter (1 output message in 1 hour). In my company we need to stop running job (with savepoint) on cluster and then after a couple of hours we will need to restart job from last savepoint. Now I am not using TTL, and after restart, my ValueState.value not null. I want after restart in less than a hour ValueState.value not null ( if i put in state before stop), but more than a hour value state always will be null.
P.s I use RrocksDb state backend, incremental checkpoints with interval 1s. It works perfectly.))


Solution

  • If I will put in ValueState some element in 15:00 and then stop my job with savepoint and only in 17:00 i will start my job from last savepoint. Value State will be clear, am i right?

    (1) This ValueState will be effectively gone, but I'm not sure if it will actually be gone. If your state TTL config includes cleanupFullSnapshot() then you are guaranteed that the savepoint won't include the state in question if you took the savepoint after 16:00. But in this case, it appears that neither of those things is true, so the state was in the snapshot. I don't know if state that has since expired is removed during snapshot restore, or during the next clean-up. But since you have specified NeverReturnExpired, it cannot affect the results.

    How [does] it (ttl) work with Apache Flink CEP Pattern?

    (2) CEP doesn't use state TTL. CEP keeps state around for as long as it may affect the pattern matching, and explicitly clears state once it is no longer needed. From the way you have worded this question, I assume you are using processing time, rather than event time. In that case, the pattern will not have been matched within 60 minutes. But if you were to use event time, then the watermarks would be used to determine how much time has elapsed, and the period of downtime would have no effect on the pattern matching.

    Update:

    I see now that you are using ingestion time, and relying on timers to clear the state. With ingestion time you have the choice to use either event time or processing time timers. If you use processing time timers, then any timers that should have fired while the job was not running will fire immediately after the job is restarted. With event time timers, they will fire as soon as the watermarks reach the times in those timers. Since watermarks are not saved in savepoints, some events will have to flow and be processed (and with periodic watermarks, the auto-watermark interval will have to elapse) before any watermarks will be created.