Search code examples
kotlinapache-flinkflink-streamingstream-processing

How to expire keyed state with TTL in Apache Flink?


I have a pipeline like this:

 env.addSource(kafkaConsumer)
            .keyBy { value -> value.f0 }
            .window(EventTimeSessionWindows.withGap(Time.minutes(2)))
            .reduce(::reduceRecord)
            .addSink(kafkaProducer)

I want to expire keyed data with a TTL.

Some blog posts point that I need a ValueStateDescriptor for that. I made one like this:

val desc = ValueStateDescriptor("val state", MyKey::class.java)
desc.enableTimeToLive(ttlConfig)

But how do I actually apply this descriptor to my pipeline so it will actually do the TTL expiry?


Solution

  • The pipeline you've described doesn't use any keyed state that would benefit from setting state TTL. The only keyed state in your pipeline is the contents of the session windows, and that state is being purged as soon as possible -- as the sessions close. (Furthermore, since you are using a reduce function, that state consists of just one value per key.)

    For the most part, expiring state is only relevant for state you explicitly create, in which case you will have ready access to the state descriptor and can configure it to use State TTL. Flink SQL does create state on your behalf that might not automatically expire, in which case you will need to use Idle State Retention Time to configure it. The CEP library also creates state on your behalf, and in this case you should ensure that your patterns either eventually match or timeout.