Search code examples
scaladataframeapache-sparkrdd

Converting RDD into Dataframe


I am new in spark/scala. I have a created below RDD by loading data from multiple paths. Now i want to create dataframe from same for further operations. below should be the schema of dataframe

schema[UserId, EntityId, WebSessionId, ProductId]

rdd.foreach(println)

545456,5615615,DIKFH6545614561456,PR5454564656445454
875643,5485254,JHDSFJD543514KJKJ4
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR5454564656445454
545456,5615615,DIKFH6545614561456,PR54545DSKJD541054
264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515
732543,8765984,UJHSG4240323545144
564574,6276832,KJDXSGFJFS2545DSAS

Will anyone please help me....!!!

I have tried same by defining schema class and mapping same against rdd but getting error

"ArrayIndexOutOfBoundsException :3"


Solution

  • If you treat your columns as String you can create with the following:

    import org.apache.spark.sql.Row
    
    val rdd : RDD[Row] = ???
    
    val df = spark.createDataFrame(rdd, StructType(Seq(
      StructField("userId", StringType, false),
      StructField("EntityId", StringType, false),
      StructField("WebSessionId", StringType, false),
      StructField("ProductId", StringType, true))))
    

    Note that you must "map" your RDD to a RDD[Row] for the compiler to allow to use the "createDataFrame" method. For the missing fields you can declare the columns as nullable in the DataFrame Schema.

    In your example you are using the RDD method spark.sparkContext.textFile(). This method returns a RDD[String] that means that each element of your RDD is a line. But, you need a RDD[Row]. So you need to split your string by commas like:

    val list = 
     List("545456,5615615,DIKFH6545614561456,PR5454564656445454",
       "875643,5485254,JHDSFJD543514KJKJ4", 
       "545456,5615615,DIKFH6545614561456,PR5454564656445454", 
       "545456,5615615,DIKFH6545614561456,PR5454564656445454", 
       "545456,5615615,DIKFH6545614561456,PR54545DSKJD541054", 
       "264264,3254564,MNXZCBMNABC5645SAD,PR5142545564542515", 
    "732543,8765984,UJHSG4240323545144","564574,6276832,KJDXSGFJFS2545DSAS")
    
    
    val FilterReadClicks = spark.sparkContext.parallelize(list)
    
    val rows: RDD[Row] = FilterReadClicks.map(line => line.split(",")).map { arr =>
      val array = Row.fromSeq(arr.foldLeft(List[Any]())((a, b) => b :: a))
      if(array.length == 4) 
        array
      else Row.fromSeq(array.toSeq.:+(""))
    }
    
    rows.foreach(el => println(el.toSeq))
    
    val df = spark.createDataFrame(rows, StructType(Seq(
      StructField("userId", StringType, false),
      StructField("EntityId", StringType, false),
      StructField("WebSessionId", StringType, false),
      StructField("ProductId", StringType, true))))
    
    df.show()
    
    +------------------+------------------+------------+---------+
    |            userId|          EntityId|WebSessionId|ProductId|
    +------------------+------------------+------------+---------+
    |PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
    |JHDSFJD543514KJKJ4|           5485254|      875643|         |
    |PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
    |PR5454564656445454|DIKFH6545614561456|     5615615|   545456|
    |PR54545DSKJD541054|DIKFH6545614561456|     5615615|   545456|
    |PR5142545564542515|MNXZCBMNABC5645SAD|     3254564|   264264|
    |UJHSG4240323545144|           8765984|      732543|         |
    |KJDXSGFJFS2545DSAS|           6276832|      564574|         |
    +------------------+------------------+------------+---------+
    

    With rows rdd you will be able to create the dataframe.