Learning custom Spark RDD Partitioner, coded some logic, but don't compile.
In Spark 2.4.3, start spark shell :
case class Transaction(name:String, amount:Double, country:String)
val transactions = Seq(
Transaction("Bob", 100, "UK"),
Transaction("James", 15, "UK"),
Transaction("Marek", 51, "US"),
Transaction("Paul", 57, "US")
)
import org.apache.spark.Partitioner
class CountryPartitioner(override val numPartitions: Int) extends Partitioner {
def getPartition(key: Any): Int = key match {
case s: Transaction => s.country.hashCode % numPartitions
}
override def equals(other: Any): Boolean = other.isInstanceOf[CountryPartitioner]
override def hashCode: Int = 0
}
val rdd = sc.parallelize(transactions).partitionBy(new CountryPartitioner(2))
The error is
error: value partitionBy is not a member of org.apache.spark.rdd.RDD[Transaction]
rdd.partitionBy(new CountryPartitioner(2))
^
I learned from online, this code will work without any error. My code is almost same as this code, the difference is Transaction class... don't know why my code is not working. even I can't online RDD api for this.
import org.apache.spark.Partitioner
class TwoPartsPartitioner(override val numPartitions: Int) extends Partitioner { def getPartition(key: Any): Int = key match { case s: String => {if (s(0).toUpper > 'J') 1 else 0 } }
override def equals(other: Any): Boolean = other.isInstanceOf[TwoPartsPartitioner]
override def hashCode: Int = 0
}
var x = sc.parallelize(Array(("sandeep",1),("giri",1),("abhishek",1),("sravani",1),("jude",1)), 3)
var y = x.partitionBy(new TwoPartsPartitioner(2))
source: https://gist.github.com/girisandeep/f90e456da6f2381f9c86e8e6bc4e8260
This will not work as you need a Key Value pair for RDD partitionBy to work. Messages in Spark are a little vague at times. Transaction Class is not a KV Pair.
See Partitioning of Data Frame in Pyspark using Custom Partitioner, the other answer, not mine.
A lot of operations on RDD are KV Pair oriented, e.g. JOIN, not particularly handy.