Search code examples
apache-flink

Does Flink support State TTL for BroadcastState?


With Flink 1.8.1, I am trying to apply State TTL to BroadcastState (using a MapStateDescriptor) like this:

(Holder is a POJO wrapping an private int variable "deger")

...

        StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
        StateBackend stateBackend = new FsStateBackend("file://.....");
        envStream.setStateBackend(stateBackend);
        envStream.enableCheckpointing(1_000L, CheckpointingMode.EXACTLY_ONCE);

...

        MapStateDescriptor<Integer, Client> clientMapStateDescriptor = new MapStateDescriptor<>(
            "ClientBroadcastState",
            BasicTypeInfo.INT_TYPE_INFO,
            TypeInformation.of(new TypeHint<Client>() {})
        );
        StateTtlConfig ttlConfig = StateTtlConfig
            .newBuilder(Time.seconds(3))
            // .cleanupFullSnapshot()
            // .cleanupInBackground()
            .cleanupIncrementally(100, false)
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build();
        clientMapStateDescriptor.enableTimeToLive(ttlConfig);

        DataStream<Client> clientDataStream = envStream.fromCollection(clientList);
        // clientDataStream.print("clientDataStream");

        BroadcastStream<Client> clientBroadcastStream = clientDataStream
            .broadcast(clientMapStateDescriptor);

        List<Holder> holderList = new ArrayList<>(count);
        for(int i = 0; i < count; i++) {
            holderList.add(new Holder(i));
        }
        DataStream<Holder> integerHolderDataStream = envStream.fromCollection(holderList);

        BroadcastConnectedStream<Holder, Client> connectedStreams = integerHolderDataStream
            .keyBy("deger")
            .connect(clientBroadcastStream);

        SingleOutputStreamOperator<Row> operator = connectedStreams.process(new KeyedBroadcastProcessFunction<Integer, Holder, Client, Row>() {

            @Override
            public void processElement(Holder value, ReadOnlyContext ctx, Collector<Row> out) throws Exception {
                for (Map.Entry<Integer, Client> entry : ctx.getBroadcastState(clientMapStateDescriptor).immutableEntries()) {
                    Client c = ctx.getBroadcastState(clientMapStateDescriptor).get(entry.getKey());
                    System.out.println(value.getDeger() + " - " + c);
                }
                Thread.sleep(1000L);
            }

            @Override
            public void processBroadcastElement(Client value, Context ctx, Collector<Row> out) throws Exception {
                ctx.getBroadcastState(clientMapStateDescriptor).put(value.getId(), value);
            }

        });

...

holderList has enough instances to test if entries in state are evicted.

But the entries in the BroadcastState doesn't get expired.

Things I've tried:

  • Using a different state backend (FsStateBackend)
  • Enabling checkpointing
  • Explicitly accessing map state values

What am I possibly doing wrong? Does BroadcastState support StateTTL?

If it does not, can you provide an example of how to evict entries in BroadcastState (using a MapStateDescriptor)?


Solution

  • Based on what it says in FLIP-25, StateTTL is only for keyed state.

    Items stored in BroadcastState can only be written or cleared in the processBroadcastElement method of a BroadcastProcessFunction (or Keyed BroadcastProcessFunction) -- which means you'll have to do it as part of handling the receipt of another broadcast element. And you need to take care to use exactly the same logic in all of the parallel instances, as Flink expects every instance to be consistent about the contents of BroadcastState, and strange things could result if you do anything non-deterministic or instance-specific here.

    One solution would be to broadcast stream records that are interpreted by the recipients as commands to expire earlier records from the broadcast state.