Search code examples
apache-flink

Flink batch join performance


I've been testing a simple join with both TableApi and DataStream api in batch mode. However i've been getting pretty bad results, so it must be i'm doing something wrong. Datasets used for joining are ~900gb and 3gb. Environment used for testing is EMR with 10 * m5.xlarge worker nodes.

TableApi approach used is creating a tables over data s3 paths and performing insert into statement to a created table over destination s3 path. With tweaking task manager memory, numberOfTaskSlots, parallelism but couldn't make it perform in somewhat acceptable time ( 1.5h at least ).

When using DataStreamApi in batch mode i always encounter a problem where yarn kills task due to it using over 90% of disk space. So i'm confused if that's due to the code, or just flink needs much more disk space than spark does. Reading in datastreams:

    val sourceStream: DataStream[SourceObj] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "someSourceName")
      .map(x => SourceObj.convertFromString(x))

Joining:

    val joinedStream = sourceStream.join(sourceStream2)
      .where(col1 => sourceStream.col1)
      .equalTo(col2 => sourceStream2.col2)
      .window(GlobalWindows.create())
      .trigger(CountTrigger.of(1))
      .apply{
        (s, c) => JoinedObj(c.col1, s.col2, s.col3)
      }

Am I missing something or i just need to scale up the cluster?


Solution

  • In general you're better off implementing relational workloads with Flink's Table/SQL API, so that its optimizer has a chance to help out.

    But if I'm reading this correctly, this particular join is going to be quite expensive to execute because nothing is ever expired from state. Both tables will be fully materialized within Flink, because for this query, every row of input remains relevant and could affect the result.

    If you can convert this into some sort of join with a temporal constraint that can be used by the optimizer to free up rows that are no longer useful, then it will be much better behaved.