I am running a TPC-H query in Flink Data Stream API version 1.10.1. One of the UDF's reads the entire table LineItem
and store it in memory (using the default MemoryStateBackend
). First I was storing the list of objects List<LineItem>
. But I was running out of memory and it is best to store only the fields that I will use. So I am storing List<Tuple2<Integer, Double>>
. I also increased the timeout for requesting and receiving heartbeat for both sender and receiver sides heartbeat.timeout: 100000
. When I use the data source table file of around 500MB I can execute my query. However, the original size is 725MB and with this size, I am experiencing some lags on the metrics. I also increased the memory size for the Task Managers and for the Job Manager. But it seems that this is not the problem anymore.
jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 100000
taskmanager.memory.flink.size: 12g
taskmanager.memory.jvm-overhead.max: 4g
taskmanager.memory.jvm-metaspace.size: 2048m # default: 1024m
This is my UDF that uses the ListState
.
public class OrderKeyedByProcessFunction extends KeyedProcessFunction<Long, Order, Tuple2<Integer, Double>> {
private ListState<Tuple2<Integer, Double>> lineItemList = null;
@Override
public void open(Configuration parameters) {
try {
super.open(parameters);
ListStateDescriptor<Tuple2<Integer, Double>> lineItemDescriptor = new ListStateDescriptor<>("lineItemState",
TypeInformation.of(new TypeHint<Tuple2<Integer, Double>>() {
}));
lineItemList = getRuntimeContext().getListState(lineItemDescriptor);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void processElement(Order order, KeyedProcessFunction<Long, Order, Tuple2<Integer, Double>>.Context context,
Collector<Tuple2<Integer, Double>> out) {
try {
if (lineItemList != null && Iterators.size(lineItemList.get().iterator()) == 0) {
LineItemSource lineItemSource = new LineItemSource();
List<Tuple2<Integer, Double>> lineItems = lineItemSource.getLineItemsRevenueByOrderKey();
lineItemList.addAll(lineItems);
}
for (Tuple2<Integer, Double> lineItem : lineItemList.get()) {
if (order != null && (lineItem.f0.intValue() == ((int) order.getOrderKey()))) {
out.collect(Tuple2.of((int) order.getCustomerKey(), lineItem.f1));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
I am using Prometheus + Grafana to get the metrics and this configuration properties on the flink-conf.yaml
# Metrics Reporter
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 9250-9260
and this configuration on /etc/prometheus/prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'prometheus'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9090']
- job_name: 'node_exporter'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9100', 'r02:9100', 'r01:9250', 'r04:9250']
- job_name: 'flink'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9090', 'localhost:9250', 'localhost:9251', 'r02:9250', 'r01:9250', 'r04:9250']
metrics_path: /
What kind of configuration can I adjust to not experience this lag on the metrics?
This appears to be a case where using Flink state is only making things worse. If the data you are loading into these lists is immutable, and you want to keep it in memory, then the overhead of storing it in ListState isn't buying you anything useful. I say this because there's no reason to checkpoint this state, and because you aren't taking advantage of the RocksDB state backend to have it spill to disk. So I think you'd be better off using a normal java map of order keys to lists of tuples. Making that change may be enough to solve your performance problems.
(Also, for what it's worth, in processElement
, lineItemList
can never be null, since it was initialized in the open
method.)