We need to convert text data into parquet/avro on daily basis where the input comes from multiple sources has different structure we would like to have spark sql based scala code to achieve this irrespective of the delimiter and number of columns or structure.
I have written this code in spark 2.1.0 - Spark SQL
Input used
1238769|Michael|Hoffman|50000|New York
1238769|Michael1|Hoffman1|50000|New York1
1238770|Michael2|Hoffman2|50000|New York2
1238771|Michael3|Hoffman3|50000|New York3
1238772|Michael4|Hoffman4|50000|New York4
1238773|Michael5|Hoffman5|50000|New York5
1238774|Michael6|Hoffman6|50000|New York6
1238775|Michael7|Hoffman7|50000|New York7
1238776|Michael8|Hoffman8|50000|New York8
1238777|Michael9|Hoffman9|50000|New York9
In this example i am going to convert a pipe("|") text file into a parquet
Step #1: Reading the input variables
//creating spark session
val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
import spark.implicits._
//Assigning values to the variables
val input_location = args(0).trim.toString()
val delimiter = "\\|" //You can make it dynamic by passing it as an argument
val selectColString_location = args(1).trim().toString()
val output_location = args(2).trim().toString()
Step #2: Reading input text data and splitting as per the delimiter
//Reading data from text file
val input_rdd = spark.sparkContext.textFile(input_location)
//Split the input data using the delimiter(we are suing pipe(\\|) as delimiter for this example)
val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))
Step #3: Converting rdd created in step #2 into dataframe using toDF with only a single column - col, which will be an array column
//Converting input_array_rdd into dataframe with only one column - col
val input_df:DataFrame = input_array_rdd.toDF("col")
//Creating temp table on top of input_df with the name TABLE1
input_df.createOrReplaceTempView("TABLE1")
Step #4: Preparing an select statement as per the input structure using the temp table - TABLE1 and the array column - col & saving this in a text file as a single row
select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1
Step #5: Reading the select statement from the file and executing it to generate output
//Reading the selectColString, remember we are reading only the first row from the file
//Select SQL should be only one row in the selectColString.txt file
val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
//Generating the output using the colString
val output_df = spark.sql(sqlColString)
Step #6: Writing the output as parquet
output_df.write.mode(SaveMode.Overwrite).parquet(output_location)
Output parquet schema
root
|-- cust_id: long (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- amount: decimal(18,6) (nullable = true)
|-- city: string (nullable = true)
With this single program we are able to convert all our text files to parquet by just modifying the selectColString file as per the input text.
Github Code Link: https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet