I am trying to get the number of attributes a flink's DataSet
has, the first attempt was simply:
dataset.input
.map(_.vector.size)
.reduce((_, b) => b)
.collect
.head
Then, looking at the implementation of Solver I saw it does it this way:
// TODO: Faster way to do this?
dataset.map(_.vector.size)
.reduce((a, b) => b)
But the TODO comment says it all.
So, I came out with this implementation:
dataset.first(1)
.map(_.vector.size)
.reduce((_, b) => b)
.collect
Is there a more efficient implementation?
The fastest would be
dataset
// only forward first vector of each partition
.mapPartition(in => if (in.hasNext) Seq(in.next) else Seq())
// move all remaining vectors to a single partition, compute size of the first and forward it
.mapPartition(in => if (in.hasNext) Seq(in.next.vector.size) else Seq()).setParallelism(1)
.collect
Using reduce
or groupReduce
is less efficient because might move data to a single machine without reducing it first and results in on function call per input record. mapPartition
is called once per partition and only forwards the first element of the input iterator.