Search code examples
apache-flinkflink-streaming

How can I solve busy time problem in process function?


I have a flink(v1.13.3) application with un-bounded stream (using kafka). And one of the my stream is so busy. And also busy value (I can see on the UI) increases over the time. When I just start flink application:

  • sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"}) returns 300-450 ms
  • After five++ hours sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"}) returns 5-7 sn.

This function is so simple, and it just use rocksdb for the state backend:


public class MyObj implements Serializable
{

    private Set<String> distinctValues;

    public MyObj()
    {
        this.distinctValues = new HashSet<>();
    }

    public Set<String> getDistinctValues() {
        return distinctValues;
    }

    public void setDistinctValues(Set<String> values) {
        this.distinctValues = values;
    }
}

public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output> 
{
    
    private transient ValueState<MyObj> state;

    @Override
    public void open(Configuration parameters)
    {

        ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
                TypeInformation.of(MyObj.class));
        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
    {
        MyObj stateValue = state.value();
        if (stateValue == null)
        {
            stateValue = new MyObj();
            ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
        }

        stateValue.getDistinctValues().add(value.getValue());
        if (stateValue.getDistinctValues().size() >= 20)
        {
            state.clear();
        }
        else
        {
            state.update(stateValue);
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
    {
        state.clear();
    }
}

NOTE: Before implementing valueState, I was just using ListState. But using with listState flink_taskmanager_job_task_busyTimeMsPerSecond returns 25-30sn:

public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output> 
{

    private transient ListState<String> listState;

    @Override
    public void open(Configuration parameters)
    {
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
        listState = getRuntimeContext().getListState(listStateDescriptor);
    }

    @Override
    public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
    {
        List<String> values = IteratorUtils.toList(listState.get().iterator());

        if (CollectionUtils.isEmpty(values))
        {
            ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
        }

        if (!values.contains(value.getValue()))
        {
            values.add(value.getValue());
            listState.update(values);
        }
        
        if (values.size() >= 20)
        {
            ...
        }
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
    {
        listState.clear();
    }
}

Solution

  • Some slowdown is to be expected once RocksDB reaches the point where the working state no longer fits in memory. However, in this case you should be able to dramatically improve performance by switching from ValueState to MapState.

    Currently you are deserializing and reserializing the entire hashSet for every record. As these hashSets grow over time, performance degrades.

    The RocksDB state backend has an optimized implementation of MapState. Each individual key/value entry in the map is stored as a separate RocksDB object, so you can lookup, insert, and update entries without having to do serde on the rest of the map.

    ListState is also optimized for RocksDB (it can be appended to without deserializing the list). In general it's best to avoid storing collections in ValueState when using RocksDB, and use ListState or MapState instead wherever possible.

    Since the heap-based state backend keeps its working state as objects on the heap, it doesn't have the same issues.