I have parquet files that are partitioned by the date created (BusinessDate) and the data source (SourceSystem). Some source systems generate their data with different column names (small stuff like capitalization, ie orderdate
vs OrderDate
), but the same overall data structure (column order and data type is always the same between files).
My data looks like this in my filesystem:
dataroot
|-BusinessDate=20170809
|-SourceSystem=StoreA
|-data.parquet (has column "orderdate")
|-SourceSystem=StoreB
|-data.parquet (has column "OrderDate")
Is there a way to read the data in from either dataroot
or dataroot/BusinessData=######/
, and somehow normalize the data into a uniform schema?
My first attempt was to try:
val inputDF = spark.read.parquet(samplePqt)
standardNames = Seq(...) //list of uniform column names in order
val uniformDF = inputDF.toDF(standardNames: _*)
But this does not work (will rename columns which have same column names between source systems, but will populate with null
for records from source system B with different column names).
I never did find a way to process all of the data in one pass, my solution iterates through the distinct source systems, creates filepaths pointing to each source system, and processes them individually. As they get individually processed, they get transformed into a standard schema and unioned with the other results.
val inputDF = spark.read.parquet(dataroot) //dataroot contains business date
val sourceList = inputDF.select(inputDF("source_system")).distinct.collect.map(_(0)).toList //list of source systems for businessdate
sourceList.foreach(println(_))
for (ss <- sourceList){//process data}