I have a flink(v1.13.3) application with un-bounded stream (using kafka). And one of the my stream is so busy. And also busy value (I can see on the UI) increases over the time. When I just start flink application:
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 300-450 mssum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
returns 5-7 sn.This function is so simple, and it just use rocksdb for the state backend:
public class MyObj implements Serializable
{
private Set<String> distinctValues;
public MyObj()
{
this.distinctValues = new HashSet<>();
}
public Set<String> getDistinctValues() {
return distinctValues;
}
public void setDistinctValues(Set<String> values) {
this.distinctValues = values;
}
}
public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ValueState<MyObj> state;
@Override
public void open(Configuration parameters)
{
ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
TypeInformation.of(MyObj.class));
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
{
MyObj stateValue = state.value();
if (stateValue == null)
{
stateValue = new MyObj();
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
}
stateValue.getDistinctValues().add(value.getValue());
if (stateValue.getDistinctValues().size() >= 20)
{
state.clear();
}
else
{
state.update(stateValue);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
{
state.clear();
}
}
NOTE: Before implementing valueState, I was just using ListState. But using with listState flink_taskmanager_job_task_busyTimeMsPerSecond
returns 25-30sn:
public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ListState<String> listState;
@Override
public void open(Configuration parameters)
{
ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
{
List<String> values = IteratorUtils.toList(listState.get().iterator());
if (CollectionUtils.isEmpty(values))
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
}
if (!values.contains(value.getValue()))
{
values.add(value.getValue());
listState.update(values);
}
if (values.size() >= 20)
{
...
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
{
listState.clear();
}
}
Some slowdown is to be expected once RocksDB reaches the point where the working state no longer fits in memory. However, in this case you should be able to dramatically improve performance by switching from ValueState
to MapState
.
Currently you are deserializing and reserializing the entire hashSet for every record. As these hashSets grow over time, performance degrades.
The RocksDB state backend has an optimized implementation of MapState
. Each individual key/value entry in the map is stored as a separate RocksDB object, so you can lookup, insert, and update entries without having to do serde on the rest of the map.
ListState
is also optimized for RocksDB (it can be appended to without deserializing the list). In general it's best to avoid storing collections in ValueState
when using RocksDB, and use ListState
or MapState
instead wherever possible.
Since the heap-based state backend keeps its working state as objects on the heap, it doesn't have the same issues.