Search code examples
scalaapache-sparkapache-spark-sqlparquet

Pair rdd save to parquet file scala


I have RDD[Map[String, String]] , need to convert to datframe so that , I could save data in parquet file where map keys is column name.

For example:

 val inputRdf = spark.sparkContext.parallelize(List(Map("city" -> "", "ip" -> "42.106.1.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
          Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
           Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22")))

Output:

City | ip
Delhi| 1.234

Solution

  • There I put some guidance to resolve your problem

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession
    
    object MapToDfParquet {
    
      val spark = SparkSession
        .builder()
        .appName("MapToDfParquet")
        .master("local[*]")
        .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
        .config("spark.app.id","MapToDfParquet") // To silence Metrics warning
        .getOrCreate()
    
      val sc = spark.sparkContext
    
      val sqlContext = spark.sqlContext
    
      def main(args: Array[String]): Unit = {
    
        Logger.getRootLogger.setLevel(Level.ERROR)
    
        try {
    
          import spark.implicits._
    
          val data = Seq(Map("city" -> "delhi", "ip" -> "42.1.15.102", "source" -> "PlayStore","createdDate"->"2020-04-21"),
                         Map("city" -> "", "ip" -> "42.06.15.102", "source" -> "PlayStore","createdDate"->"2020-04-22"))
            .map( seq => seq.values.mkString(","))
    
          val df = sc.parallelize(data)
            .map(str => str.split(","))
            .map(arr => (arr(0),arr(1),arr(2),arr(3)))
            .toDF("city", "ip","source","createdDate")
    
          df.show(truncate = false)
    
          // by default writes it will write as parquet with snappy compression
          // we change this behavior and save as parquet uncompressed
          sqlContext.setConf("spark.sql.parquet.compression.codec","uncompressed")
          
          df
            .write
            .parquet("hdfs://quickstart.cloudera/user/cloudera/parquet")
    
          // To have the opportunity to view the web console of Spark: http://localhost:4040/
          println("Type whatever to the console to exit......")
          scala.io.StdIn.readLine()
        } finally {
          sc.stop()
          println("SparkContext stopped")
          spark.stop()
          println("SparkSession stopped")
        }
      }
    }
    

    expected output

    +-----+------------+---------+-----------+
    |city |ip          |source   |createdDate|
    +-----+------------+---------+-----------+
    |delhi|42.1.15.102 |PlayStore|2020-04-21 |
    |     |42.06.15.102|PlayStore|2020-04-22 |
    +-----+------------+---------+-----------+