Search code examples
scalaapache-flink

Compute Number of Attributes efficiently in flink


I am trying to get the number of attributes a flink's DataSet has, the first attempt was simply:

dataset.input
    .map(_.vector.size)
    .reduce((_, b) => b)
    .collect
    .head

Then, looking at the implementation of Solver I saw it does it this way:

// TODO: Faster way to do this?
dataset.map(_.vector.size)
    .reduce((a, b) => b)

But the TODO comment says it all.

So, I came out with this implementation:

dataset.first(1)
    .map(_.vector.size)
    .reduce((_, b) => b)
    .collect

Is there a more efficient implementation?


Solution

  • The fastest would be

    dataset
      // only forward first vector of each partition
      .mapPartition(in => if (in.hasNext) Seq(in.next) else Seq())
      // move all remaining vectors to a single partition, compute size of the first and forward it
      .mapPartition(in => if (in.hasNext) Seq(in.next.vector.size) else Seq()).setParallelism(1)
      .collect
    

    Using reduce or groupReduce is less efficient because might move data to a single machine without reducing it first and results in on function call per input record. mapPartition is called once per partition and only forwards the first element of the input iterator.