Search code examples
scalaapache-sparkimplicits

Implicits in a Spark Scala program not working


I am not able to perform an implicit conversion from an RDD to a Dataframe in a Scala program although I am importing spark.implicits._.

Any help would be appreciated.

Main Program with the implicits:

object spark1 {

  def main(args: Array[String]) {
    val spark = SparkSession.builder().appName("e1").config("o1", "sv").getOrCreate()

    import spark.implicits._

    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = spark.sparkContext
    val data = sc.textFile("/TestDataB.txt")
    val allSplit = data.map(line => line.split(","))
    case class CC1(LAT: Double, LONG: Double)
    val allData = allSplit.map( p => CC1( p(0).trim.toDouble, p(1).trim.toDouble))
    val allDF = allData.toDF()
    // ... other code
  }

}

Error is as follows:

Error:(40, 25) value toDF is not a member of org.apache.spark.rdd.RDD[CC1] val allDF = allData.toDF()


Solution

  • When you define the case class CC1 inside the main method, you hit https://issues.scala-lang.org/browse/SI-6649; toDF() then fails to locate the appropriate implicit TypeTag for that class at compile time.

    You can see this in this simple example:

    case class Out()
    
    object TestImplicits {
    
      def main(args: Array[String]) {
        case class In()
        val typeTagOut = implicitly[TypeTag[Out]] // compiles
        val typeTagIn = implicitly[TypeTag[In]]   // does not compile: Error:(23, 31) No TypeTag available for In
      }
    
    }
    

    Spark's relevant implicit conversion has this type parameter: [T <: Product : TypeTag] (see newProductEncoder here), which means an implicit TypeTag[CC1] is required.

    To fix this - simply move the definition of CC1 out of the method, or out of object entirely:

    case class CC1(LAT: Double, LONG: Double)
    
    object spark1 {
    
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("e1").config("o1", "sv").getOrCreate()
    
        import spark.implicits._
    
        val data = spark.sparkContext.textFile("/TestDataB.txt")
        val allSplit = data.map(line => line.split(","))
    
        val allData = allSplit.map( p => CC1( p(0).trim.toDouble, p(1).trim.toDouble))
        val allDF = allData.toDF()
        // ... other code
      }
    
    }