Search code examples
scalaapache-sparkapache-spark-sqlparquet

How to read partitioned parquets with same structure but different column names?


I have parquet files that are partitioned by the date created (BusinessDate) and the data source (SourceSystem). Some source systems generate their data with different column names (small stuff like capitalization, ie orderdate vs OrderDate), but the same overall data structure (column order and data type is always the same between files).

My data looks like this in my filesystem:

dataroot
|-BusinessDate=20170809
  |-SourceSystem=StoreA
    |-data.parquet (has column "orderdate")
  |-SourceSystem=StoreB
    |-data.parquet (has column "OrderDate")

Is there a way to read the data in from either dataroot or dataroot/BusinessData=######/, and somehow normalize the data into a uniform schema?

My first attempt was to try:

val inputDF = spark.read.parquet(samplePqt)
standardNames = Seq(...) //list of uniform column names in order
val uniformDF = inputDF.toDF(standardNames: _*)

But this does not work (will rename columns which have same column names between source systems, but will populate with null for records from source system B with different column names).


Solution

  • I never did find a way to process all of the data in one pass, my solution iterates through the distinct source systems, creates filepaths pointing to each source system, and processes them individually. As they get individually processed, they get transformed into a standard schema and unioned with the other results.

    val inputDF = spark.read.parquet(dataroot) //dataroot contains business date
    val sourceList = inputDF.select(inputDF("source_system")).distinct.collect.map(_(0)).toList //list of source systems for businessdate
    sourceList.foreach(println(_))
    for (ss <- sourceList){//process data}