Search code examples
scalaapache-sparkdatasetrdd

How to do that without dataset to rdd conversion?


Could someone help me how to avoid rdd conversion?

val qksDistribution: Array[((String, Int), Long)] = tripDataset
      .map(i => ((i.getFirstPoint.getQk.substring(0, QK_PARTITION_LEVEL), i.getProviderId), 1L))
      .rdd
      .reduceByKey(_+_)
      .filter(_._2>maxCountInPartition/10)
      .collect

Solution

  • val qksDistribution: Array[((String, Int), Long)] = tripDataset
          .map(i => (i.getFirstPoint.getQk.substring(0, QK_PARTITION_LEVEL), i.getProviderId)) // no need to add the 1
          .groupByKey(x => x) //similar to key by
          .count // you wanted to count per key
          .filter(_._2>maxCountInPartition/10)
          .collect