Search code examples
apache-flinkflink-streamingapache-iceberg

Apache Fink & Iceberg: Not able to process hundred of RowData types


I have a Flink application that reads arbitrary AVRO data, maps it to RowData and uses several FlinkSink instances to write data into ICEBERG tables. By arbitrary data I mean that I have 100 types of AVRO messages, all of them with a common property "tableName" but containing different columns. I would like to write each of these types of messages into a separated Iceberg table.

For doing this I'm using side outputs: when I have my data mapped to RowData I use a ProcessFunction to write each message into a specific OutputTag.

Later on, with the datastream already processed, I loop into the different output tags, get records using getSideOutput and create an specific IcebergSink for each of them. Something like:


        final List<OutputTag<RowData>> tags = ... // list of all possible output tags

        final DataStream<RowData> rowdata = stream
                .map(new ToRowDataMap()) // Map Custom Avro Pojo into RowData
                .uid("map-row-data")
                .name("Map to RowData")
                .process(new ProcessRecordFunction(tags)) // process elements one by one sending them to a specific OutputTag
                .uid("id-process-record")
                .name("Process Input records");;

        CatalogLoader catalogLoader = ...
        String upsertField = ...
     
        outputTags
                .stream()
                .forEach(tag -> {
                    SingleOutputStreamOperator<RowData> outputStream = stream
                            .getSideOutput(tag);

                    TableIdentifier identifier = TableIdentifier.of("myDBName", tag.getId());

                    FlinkSink.Builder builder = FlinkSink
                            .forRowData(outputStream)
                            .table(catalog.loadTable(identifier))
                            .tableLoader(TableLoader.fromCatalog(catalogLoader, identifier))
                            .set("upsert-enabled", "true")
                            .uidPrefix("commiter-sink-" + tableName)
                            .equalityFieldColumns(Collections.singletonList(upsertField));
                    builder.append();
                });

It works very well when I'm dealing with a few tables. But when the number of tables scales up, Flink cannot acquire enough task resources since each Sink requires two different operators (because of the internals of https://iceberg.apache.org/javadoc/0.10.0/org/apache/iceberg/flink/sink/FlinkSink.html).

Is there any other way of doing this or optimizing it so that it can acquire enough task resources?


Solution

  • Given your question, I assume that about half of your operators are IcebergStreamWriter which are fully utilised and another half is IcebergFilesCommitter which are rarely used.

    You can optimise the resource usage of the servers by:

    • Increasing the number of slots on the TaskManagers (taskmanager.numberOfTaskSlots) [1] - so the CPU not utilised by the idle IcebergFilesCommitter Operators are then used by the other operators on the TaskManager
    • Increasing the resources provided to the TaskManagers (taskmanager.memory.process.size) [2] - this helps by distributing the JVM Memory overhead between the running Operators on this TaskManager (do not forget to increase the slots in parallel this change to start using the extra resource)

    The possible downside in adding more slots for the TaskManagers could cause Operators competing for CPU, and the memory is still reserved for the "idle" tasks. [3]

    Maybe this Flink architecture could useful too.