Search code examples
javaapache-flinkflink-streaming

How to configure Flink DataStream job to handle an immutable ListState of a table of 725MB?


I am running a TPC-H query in Flink Data Stream API version 1.10.1. One of the UDF's reads the entire table LineItem and store it in memory (using the default MemoryStateBackend). First I was storing the list of objects List<LineItem>. But I was running out of memory and it is best to store only the fields that I will use. So I am storing List<Tuple2<Integer, Double>>. I also increased the timeout for requesting and receiving heartbeat for both sender and receiver sides heartbeat.timeout: 100000. When I use the data source table file of around 500MB I can execute my query. However, the original size is 725MB and with this size, I am experiencing some lags on the metrics. I also increased the memory size for the Task Managers and for the Job Manager. But it seems that this is not the problem anymore.

jobmanager.heap.size: 4g # default: 2048m
heartbeat.timeout: 100000
taskmanager.memory.flink.size: 12g
taskmanager.memory.jvm-overhead.max: 4g
taskmanager.memory.jvm-metaspace.size: 2048m # default: 1024m

This is my UDF that uses the ListState.

public class OrderKeyedByProcessFunction extends KeyedProcessFunction<Long, Order, Tuple2<Integer, Double>> {
    private ListState<Tuple2<Integer, Double>> lineItemList = null;

    @Override
    public void open(Configuration parameters) {
        try {
            super.open(parameters);
            ListStateDescriptor<Tuple2<Integer, Double>> lineItemDescriptor = new ListStateDescriptor<>("lineItemState",
                    TypeInformation.of(new TypeHint<Tuple2<Integer, Double>>() {
                    }));
            lineItemList = getRuntimeContext().getListState(lineItemDescriptor);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void processElement(Order order, KeyedProcessFunction<Long, Order, Tuple2<Integer, Double>>.Context context,
            Collector<Tuple2<Integer, Double>> out) {
        try {
            if (lineItemList != null && Iterators.size(lineItemList.get().iterator()) == 0) {
                LineItemSource lineItemSource = new LineItemSource();
                List<Tuple2<Integer, Double>> lineItems = lineItemSource.getLineItemsRevenueByOrderKey();
                lineItemList.addAll(lineItems);
            }

            for (Tuple2<Integer, Double> lineItem : lineItemList.get()) {
                if (order != null && (lineItem.f0.intValue() == ((int) order.getOrderKey()))) {
                    out.collect(Tuple2.of((int) order.getCustomerKey(), lineItem.f1));
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

I am using Prometheus + Grafana to get the metrics and this configuration properties on the flink-conf.yaml

# Metrics Reporter
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.host: 127.0.0.1
metrics.reporter.prom.port: 9250-9260

and this configuration on /etc/prometheus/prometheus.yml

global:
  scrape_interval: 15s

scrape_configs:
  - job_name: 'prometheus'
    scrape_interval: 5s
    static_configs:
      - targets: ['localhost:9090']
  - job_name: 'node_exporter'
    scrape_interval: 5s
    static_configs:
            - targets: ['localhost:9100', 'r02:9100', 'r01:9250', 'r04:9250']
  - job_name: 'flink'
    scrape_interval: 5s
    static_configs:
            - targets: ['localhost:9090', 'localhost:9250', 'localhost:9251', 'r02:9250', 'r01:9250', 'r04:9250']
    metrics_path: /

enter image description here

What kind of configuration can I adjust to not experience this lag on the metrics?


Solution

  • This appears to be a case where using Flink state is only making things worse. If the data you are loading into these lists is immutable, and you want to keep it in memory, then the overhead of storing it in ListState isn't buying you anything useful. I say this because there's no reason to checkpoint this state, and because you aren't taking advantage of the RocksDB state backend to have it spill to disk. So I think you'd be better off using a normal java map of order keys to lists of tuples. Making that change may be enough to solve your performance problems.

    (Also, for what it's worth, in processElement, lineItemList can never be null, since it was initialized in the open method.)