Search code examples
scalaapache-sparkrdd

Transforming RDD[String] to RDD[myclass]


I am trying to transform RDD[String] to RDD[Picture] but could not do it. If I could manage to convert RDD to RDD[Picture] I would use the def hasValidCountry to check if the values latitude and longitude of the picture meta valid. And after that I am trying to check if user Tags are valid with def hasTags in Picture class. The problem I encounter :

  1. Implicit conversion found: row ⇒ augmentString(row): scala.collection.immutable.StringOps
  2. type mismatch; found : String required: Array[String]
  3. value InterestingPics is not a member of Array[Nothing] possible cause: maybe a semicolon is missing before `value InterestingPics'?

My intention is to choose line which has valid country and tags and transform all the line to new RDD[Picture] class.

ScalaFile1 (I have updated the ScalaFile):

  object Part2 {
      def main(args: Array[String]): Unit = {
        var spark: SparkSession = null
        try {
          spark = SparkSession.builder().appName("Flickr using dataframes").config("spark.master", "local[*]").getOrCreate()
          val originalFlickrMeta: RDD[String] = spark.sparkContext.textFile("flickrSample.txt")        
          
      val InterestingPics = originalFlickrMeta.map(row => row.split('\t')).map(field => Picture(field(0).toString())
      InterestingPics.collect
      InterestingPics.take(5).foreach(println)

Solution

  • This works, as an example:

    case class case_for_rdd(c1: Int, c2: String, c3: String)
    
    val rdd_data = spark.sparkContext.textFile("/FileStore/tables/csv01-4.txt")
    val rdd = rdd_data.map(row => row.split(',')).map(field => case_for_rdd(field(0).toInt, field(1), field(2)))
    rdd.collect
    

    More complicated example with reading into RDD from file with array. Array needs a delimiter.

    1,10,100,aa|bb|cc
    2,20,200,xxxxxx|yyyyyyyy|z|aaa
    

    Some sample code, but use List as otherwise you get to see array addresses, that's what those strange strings are, courtesy of smarter people here:

    case class case_for_rdd(c1: Int, c2: String, c3: String, a4: List[String])  
    val rdd_data = spark.sparkContext.textFile("/FileStore/tables/csv03.txt")
    val myCaseRdd = rdd_data.map(row => row.split(',')).map(field => case_for_rdd(field(0).toInt, field(1), field(2), (field(3).split("\\|").toList)))
    myCaseRdd.collect
    

    My advice is to use a DF and the splitting stuff is then easier. Also, manipulation of the rdd via transformation, then the case class is lost. Array with DF api has no such issue.