Search code examples
apache-sparkdictionarypysparkdynamicmultiple-columns

How to dynamically convert dataframe columns to map


How to convert the data to map in PySpark, for dynamic columns?

Input dataframe:

key_column Column_1 Column_2 ..... Column_N
1 Value_1 Value_2 ..... Value_N
1 Value_a Value_2 ...... Value_Z
2 Value_1 Value_2 ..... Value_N

Expected output dataframe:

key_column Map_output
1 {"Column_1":"Value_1, Value_a", "Column_2":"Value_2", ......, "Column_N":"Value_N, Value_Z"}
2 {"Column_1":"Value_1", "Column_2":"Value_2", ......, "Column_N":"Value_N"}

Solution

  • We can use create_map function with reduce().

    col_list = ['col1', 'col2', 'col3']  # can use sdf.columns for all columns in dataframe
    
    spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
        toDF(['col1', 'col2', 'col3']). \
        withColumn('allcol_map', 
                   func.create_map(*reduce(lambda x, y: x + y, [[func.lit(k), func.col(k)] for k in col_list]))
                   ). \
        show(truncate=False)
    
    # +-----+-----+-----+---------------------------------------------+
    # |col1 |col2 |col3 |allcol_map                                   |
    # +-----+-----+-----+---------------------------------------------+
    # |val01|val02|val03|{col1 -> val01, col2 -> val02, col3 -> val03}|
    # |val11|val12|val13|{col1 -> val11, col2 -> val12, col3 -> val13}|
    # +-----+-----+-----+---------------------------------------------+
    
    # root
    #  |-- col1: string (nullable = true)
    #  |-- col2: string (nullable = true)
    #  |-- col3: string (nullable = true)
    #  |-- allcol_map: map (nullable = false)
    #  |    |-- key: string
    #  |    |-- value: string (valueContainsNull = true)
    

    We can also use map_from_entries function that requires an array of structs. The struct fields will be converted into the maps. It will output the same result as aforementioned.

    col_list = ['col1', 'col2', 'col3']  # can use sdf.columns for all columns in dataframe
    
    spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
        toDF(['col1', 'col2', 'col3']). \
        withColumn('allcol_map', 
                   func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
                   ). \
        show(truncate=False)
    

    Based on the updated situation, you'd like to group by some key columns. Looking at the new expected output, you can use concat_ws and collect_list / collect_set to club the all / unique column values.

    col_list = ['col1', 'col2', 'col3']
    
    spark.sparkContext.parallelize([('part0', 'val01', 'val02', 'val03'), ('part0', 'val11', 'val12', 'val13'), ('part1', 'val21', 'val22', 'val23')]). \
        toDF(['key_column', 'col1', 'col2', 'col3']). \
        groupBy('key_column'). \
        agg(*[func.concat_ws(',', func.collect_set(k)).alias(k) for k in col_list]). \
        withColumn('allcol_map', 
                   func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
                   ). \
        show(truncate=False)
    
    # +----------+-----------+-----------+-----------+---------------------------------------------------------------+
    # |key_column|col1       |col2       |col3       |allcol_map                                                     |
    # +----------+-----------+-----------+-----------+---------------------------------------------------------------+
    # |part1     |val21      |val22      |val23      |{col1 -> val21, col2 -> val22, col3 -> val23}                  |
    # |part0     |val01,val11|val02,val12|val03,val13|{col1 -> val01,val11, col2 -> val02,val12, col3 -> val03,val13}|
    # +----------+-----------+-----------+-----------+---------------------------------------------------------------+