Search code examples
javatimeoutapache-flinktaskmanager

Flink Task Manager timeout


My program gets very slow as more and more records are processed. I initially thought it is due to excessive memory consumption as my program is String intensive (I am using Java 11 so compact strings should be used whenever possible) so I increased the JVM Heap:

-Xms2048m
-Xmx6144m

I also increased the task manager's memory as well as timeout, flink-conf.yaml:

jobmanager.heap.size: 6144m
heartbeat.timeout: 5000000

However, none of this helped with the issue. The Program still gets very slow at about the same point which is after processing roughly 3.5 million records, only about 0.5 million more to go. As the program approaches the 3.5 million mark it gets very very slow until it eventually times out, total execution time is about 11 minutes.

I checked the memory consumption in VisualVm, but the memory consumption never goes more than about 700MB.My flink pipeline looks as follows:

final StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment(1);
environment.setParallelism(1);
DataStream<Tuple> stream = environment.addSource(new TPCHQuery3Source(filePaths, relations));
stream.process(new TPCHQuery3Process(relations)).addSink(new FDSSink());
environment.execute("FlinkDataService");

Where the bulk of the work is done in the process function, I am implementing data base join algorithms and the columns are stored as Strings, specifically I am implementing query 3 of the TPCH benchmark, check here if you wish https://examples.citusdata.com/tpch_queries.html.

The timeout error is this:

java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <id> timed out.

Once I got this error as well:

Exception in thread "pool-1-thread-1" java.lang.OutOfMemoryError: Java heap space

Also, my VisualVM monitoring, screenshot is captured at the point where things get very slow: VisualVM

Here is the run loop of my source function:

  while (run) {
        readers.forEach(reader -> {
            try {
                String line = reader.readLine();
                if (line != null) {
                    Tuple tuple = lineToTuple(line, counter.get() % filePaths.size());
                    if (tuple != null && isValidTuple(tuple)) {
                        sourceContext.collect(tuple);
                    }
                } else {
                    closedReaders.add(reader);
                    if (closedReaders.size() == filePaths.size()) {
                        System.out.println("ALL FILES HAVE BEEN STREAMED");
                        cancel();
                    }
                }
                counter.getAndIncrement();
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
    }

I basically read a line of each of the 3 files I need, based on the order of the files, I construct a tuple object which is my custom class called tuple representing a row in a table, and emit that tuple if it is valid i.e. fullfils certain conditions on the date.

I am also suggesting the JVM to do garbage collection at the 1 millionth, 1.5millionth, 2 millionth and 2.5 millionth record like this:

System.gc()

Any thoughts on how I can optimize this?


Solution

  • String intern() saved me. I did intern on every string before storing it in my maps and that worked like a charm.