Search code examples
csvapache-sparkpysparkapache-spark-sqldata-analysis

Combining csv files with mismatched columns


I need to combine multiple csv files into one object (a dataframe, I assume) but they all have mismatched columns, like so:

CSV A

store_location_key | product_key | collector_key | trans_dt | sales | units | trans_key

CSV B

collector_key | trans_dt | store_location_key | product_key | sales | units | trans_key

CSV C

collector_key | trans_dt | store_location_key |product_key | sales | units | trans_id

On top of that, I need these to match with two additional csv files that have a matching column:

Location CSV

store_location_key | region | province | city | postal_code | banner | store_num

Product CSV

product_key | sku | item_name | item_description | department | category

The data types are all consistent, i.e., the sales column is always float, store_location_key is always int, etc. Even if I convert each csv to a dataframe first, I'm not sure that a join would work (except for the last two) because of the way that the columns need to match up.


Solution

  • To merge the first three CSV files, first read the separatly as DataFrames and then use union. The order and number of columns when using union matters, so first you need to add any missing columns to the DataFrames and then use select to make sure the columns are in the same order.

    all_columns = ['collector_key', 'trans_dt', 'store_location_key', 'product_key', 'sales', 'units', 'trans_key', 'trans_id']
    
    dfA = (spark.read.csv("a.csv", header=True)
      .withColumn(trans_id, lit(null))
      .select(all_columns))
    dfB = (spark.read.csv("b.csv", header=True)
      .withColumn(trans_id, lit(null))
      .select(all_columns))
    dfC = (spark.read.csv("c.csv", header=True)
      .withColumn(trans_key, lit(null))
      .select(all_columns))
    
    df = dfA.union(dfB).union(dfC)
    

    Note: If the order/number of columns were the same for the CSV files, they could easily be combined by using a single spark.read operation.

    After merging the first three CSVs, the rest is easy. Both the location and the product CSV can be combined with the rest using join.

    df_location = spark.read.csv("location.csv", header=True)
    df_product = spark.read.csv("product.csv", header=True)
    
    df2 = df.join(df_location, df.store_location_key == df_location.store_location_key)
    df3 = df2.join(df_product, df2.product_key == df_product.product_key)