Search code examples
apache-sparkapache-spark-sqlavroparquet

how to convert any delimited text file to parquet/avro - dynamically changing column number/stucture into avro/parquet using spark sql?


We need to convert text data into parquet/avro on daily basis where the input comes from multiple sources has different structure we would like to have spark sql based scala code to achieve this irrespective of the delimiter and number of columns or structure.


Solution

  • I have written this code in spark 2.1.0 - Spark SQL

    Input used

    1238769|Michael|Hoffman|50000|New York
    1238769|Michael1|Hoffman1|50000|New York1
    1238770|Michael2|Hoffman2|50000|New York2
    1238771|Michael3|Hoffman3|50000|New York3
    1238772|Michael4|Hoffman4|50000|New York4
    1238773|Michael5|Hoffman5|50000|New York5
    1238774|Michael6|Hoffman6|50000|New York6
    1238775|Michael7|Hoffman7|50000|New York7
    1238776|Michael8|Hoffman8|50000|New York8
    1238777|Michael9|Hoffman9|50000|New York9
    

    In this example i am going to convert a pipe("|") text file into a parquet

    Step #1: Reading the input variables

    //creating spark session
    val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
    import spark.implicits._
    
    //Assigning values to the variables
    val input_location = args(0).trim.toString()
    val delimiter = "\\|" //You can make it dynamic by passing it as an argument
    val selectColString_location = args(1).trim().toString()
    val output_location = args(2).trim().toString()
    

    Step #2: Reading input text data and splitting as per the delimiter

    //Reading data from text file
    val input_rdd = spark.sparkContext.textFile(input_location)
    
    //Split the input data using the delimiter(we are suing pipe(\\|) as delimiter for this example)
    val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))
    

    Step #3: Converting rdd created in step #2 into dataframe using toDF with only a single column - col, which will be an array column

    //Converting input_array_rdd into dataframe with only one column - col
    val input_df:DataFrame = input_array_rdd.toDF("col")
    
    //Creating temp table on top of input_df with the name TABLE1
    input_df.createOrReplaceTempView("TABLE1")
    

    Step #4: Preparing an select statement as per the input structure using the temp table - TABLE1 and the array column - col & saving this in a text file as a single row

    select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1
    

    Step #5: Reading the select statement from the file and executing it to generate output

    //Reading the selectColString, remember we are reading only the first row from the file
    //Select SQL should be only one row in the selectColString.txt file
    val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
    //Generating the output using the colString
    val output_df = spark.sql(sqlColString)
    

    Step #6: Writing the output as parquet

    output_df.write.mode(SaveMode.Overwrite).parquet(output_location)
    

    Output parquet schema

    root
     |-- cust_id: long (nullable = true)
     |-- first_name: string (nullable = true)
     |-- last_name: string (nullable = true)
     |-- amount: decimal(18,6) (nullable = true)
     |-- city: string (nullable = true)
    

    With this single program we are able to convert all our text files to parquet by just modifying the selectColString file as per the input text.

    Github Code Link: https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet