Search code examples
scalaapache-sparksqoopparquet

How to read from textfile(String type data) map and load data into parquet format(multiple columns with different datatype) in Spark scala dynamically


we are importing data from Source RDBMS system to hadoop environment using sqoop as textfile format. And this textfile need to be loaded into hive table of parquet format. How can we approach this scenario without using Hive support(earlier we used beeline insert and we are designing not to use hive anymore) and write directly to HDFS using parquet.

EX:- After sqoop import, lets say we have file under HDFS target dir. /data/loc/mydb/Mytable

data in Mytable and all are of type String.

-----------------------------------------
10|customer1|10.0|2016-09-07  08:38:00.0
20|customer2|20.0|2016-09-08  10:45:00.0
30|customer3|30.0|2016-09-10  03:26:00.0
------------------------------------------

target Hive table schema.

rec_id: int
rec_name: String
rec_value: Decimal(2,1)
rec_created: Timestamp

How can we load data from Mytable to target underlying Hive table location(parquet format) using spark and managing typecasting for all the columns dynamically.

Please Note: we cannot use HiveContext here. Any help in the approach is much appreciated. Thanks in advance.


Solution

  • The example below read a .csv file as the same format as presented in the question.

    There are some details that I would like to explain first.

    In the table schema the field: rec_value: Decimal(2,1) would have to be rec_value: Decimal(3,1) for the following reason:

    The DECIMAL type represents numbers with fixed precision and scale. When you create a DECIMAL column, you specify the precision, p, and scale, s. Precision is the total number of digits, regardless of the location of the decimal point. Scale is the number of digits after the decimal place. To represent the number 10.0 without a loss of precision, you would need a DECIMAL type with precision of at least 3, and scale of at least 1.

    So the Hive table would be:

    CREATE TABLE tab_data (
      rec_id INT,
      rec_name STRING,
      rec_value DECIMAL(3,1),
      rec_created TIMESTAMP
    ) STORED AS PARQUET;
    

    The full scala code

    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructField, StructType, TimestampType}
    
    object CsvToParquet {
    
      val spark = SparkSession
        .builder()
        .appName("CsvToParquet")
        .master("local[*]")
        .config("spark.sql.shuffle.partitions","200") //Change to a more reasonable default number of partitions for our data
        .config("spark.sql.parquet.writeLegacyFormat", true) // To avoid issues with data type between Spark and Hive
                                                             // The convention used by Spark to write Parquet data is configurable.
                                                             // This is determined by the property spark.sql.parquet.writeLegacyFormat
                                                             // The default value is false. If set to "true",
                                                             // Spark will use the same convention as Hive for writing the Parquet data.
        .getOrCreate()
    
      val sc = spark.sparkContext
    
      val inputPath = "hdfs://host:port/user/...../..../tab_data.csv"
      val outputPath = "hdfs://host:port/user/hive/warehouse/test.db/tab_data"
    
      def main(args: Array[String]): Unit = {
    
        Logger.getRootLogger.setLevel(Level.ERROR)
    
        try {
    
          val DecimalType = DataTypes.createDecimalType(3, 1)
    
          /**
            * the data schema
            */
          val schema = StructType(List(StructField("rec_id", IntegerType, true), StructField("rec_name",StringType, true),
                       StructField("rec_value",DecimalType),StructField("rec_created",TimestampType, true)))
    
          /**
            * Reading the data from HDFS as .csv text file
            */
          val data = spark
            .read
            .option("sep","|")
            .option("timestampFormat","yyyy-MM-dd HH:mm:ss.S")
            .option("inferSchema",false)
            .schema(schema)
            .csv(inputPath)
    
           data.show(truncate = false)
           data.schema.printTreeString()
    
          /**
            * Writing the data as Parquet file
            */
          data
            .write
            .mode(SaveMode.Append)
            .option("compression", "none") // Assuming no data compression
            .parquet(outputPath)
    
        } finally {
          sc.stop()
          println("SparkContext stopped")
          spark.stop()
          println("SparkSession stopped")
        }
      }
    }
    

    Input file as .csv tab separated fields

    10|customer1|10.0|2016-09-07  08:38:00.0
    20|customer2|24.0|2016-09-08  10:45:00.0
    30|customer3|35.0|2016-09-10  03:26:00.0
    40|customer1|46.0|2016-09-11  08:38:00.0
    ........
    

    reading from Spark

    +------+---------+---------+-------------------+
    |rec_id|rec_name |rec_value|rec_created        |
    +------+---------+---------+-------------------+
    |10    |customer1|10.0     |2016-09-07 08:38:00|
    |20    |customer2|24.0     |2016-09-08 10:45:00|
    |30    |customer3|35.0     |2016-09-10 03:26:00|
    |40    |customer1|46.0     |2016-09-11 08:38:00|
    ......
    

    schema

    root
     |-- rec_id: integer (nullable = true)
     |-- rec_name: string (nullable = true)
     |-- rec_value: decimal(3,1) (nullable = true)
     |-- rec_created: timestamp (nullable = true)
    

    reading from Hive

    SELECT *
    FROM tab_data;
    
    +------------------+--------------------+---------------------+------------------------+--+
    | tab_data.rec_id  | tab_data.rec_name  | tab_data.rec_value  |  tab_data.rec_created  |
    +------------------+--------------------+---------------------+------------------------+--+
    | 10               | customer1          | 10                  | 2016-09-07 08:38:00.0  |
    | 20               | customer2          | 24                  | 2016-09-08 10:45:00.0  |
    | 30               | customer3          | 35                  | 2016-09-10 03:26:00.0  |
    | 40               | customer1          | 46                  | 2016-09-11 08:38:00.0  |
    .....
    

    Hope this helps.