How to convert the data to map in PySpark, for dynamic columns?
Input dataframe:
key_column | Column_1 | Column_2 | ..... | Column_N |
---|---|---|---|---|
1 | Value_1 | Value_2 | ..... | Value_N |
1 | Value_a | Value_2 | ...... | Value_Z |
2 | Value_1 | Value_2 | ..... | Value_N |
Expected output dataframe:
key_column | Map_output |
---|---|
1 | {"Column_1":"Value_1, Value_a", "Column_2":"Value_2", ......, "Column_N":"Value_N, Value_Z"} |
2 | {"Column_1":"Value_1", "Column_2":"Value_2", ......, "Column_N":"Value_N"} |
We can use create_map
function with reduce()
.
col_list = ['col1', 'col2', 'col3'] # can use sdf.columns for all columns in dataframe
spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
toDF(['col1', 'col2', 'col3']). \
withColumn('allcol_map',
func.create_map(*reduce(lambda x, y: x + y, [[func.lit(k), func.col(k)] for k in col_list]))
). \
show(truncate=False)
# +-----+-----+-----+---------------------------------------------+
# |col1 |col2 |col3 |allcol_map |
# +-----+-----+-----+---------------------------------------------+
# |val01|val02|val03|{col1 -> val01, col2 -> val02, col3 -> val03}|
# |val11|val12|val13|{col1 -> val11, col2 -> val12, col3 -> val13}|
# +-----+-----+-----+---------------------------------------------+
# root
# |-- col1: string (nullable = true)
# |-- col2: string (nullable = true)
# |-- col3: string (nullable = true)
# |-- allcol_map: map (nullable = false)
# | |-- key: string
# | |-- value: string (valueContainsNull = true)
We can also use map_from_entries
function that requires an array of structs. The struct fields will be converted into the maps. It will output the same result as aforementioned.
col_list = ['col1', 'col2', 'col3'] # can use sdf.columns for all columns in dataframe
spark.sparkContext.parallelize([('val01', 'val02', 'val03'), ('val11', 'val12', 'val13')]). \
toDF(['col1', 'col2', 'col3']). \
withColumn('allcol_map',
func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
). \
show(truncate=False)
Based on the updated situation, you'd like to group by some key columns. Looking at the new expected output, you can use concat_ws
and collect_list
/ collect_set
to club the all / unique column values.
col_list = ['col1', 'col2', 'col3']
spark.sparkContext.parallelize([('part0', 'val01', 'val02', 'val03'), ('part0', 'val11', 'val12', 'val13'), ('part1', 'val21', 'val22', 'val23')]). \
toDF(['key_column', 'col1', 'col2', 'col3']). \
groupBy('key_column'). \
agg(*[func.concat_ws(',', func.collect_set(k)).alias(k) for k in col_list]). \
withColumn('allcol_map',
func.map_from_entries(func.array(*[func.struct(func.lit(k).alias('key'), func.col(k).alias('val')) for k in col_list]))
). \
show(truncate=False)
# +----------+-----------+-----------+-----------+---------------------------------------------------------------+
# |key_column|col1 |col2 |col3 |allcol_map |
# +----------+-----------+-----------+-----------+---------------------------------------------------------------+
# |part1 |val21 |val22 |val23 |{col1 -> val21, col2 -> val22, col3 -> val23} |
# |part0 |val01,val11|val02,val12|val03,val13|{col1 -> val01,val11, col2 -> val02,val12, col3 -> val03,val13}|
# +----------+-----------+-----------+-----------+---------------------------------------------------------------+