Search code examples
pythonpyspark

transpose the columns and make dictionaries as values in the other column


I have this spark data frame. I am trying to convert the columns to rows and make one of the columns gw are the key of the dictionary in the other column.

+-----+-----+-----+-------+
|  gw |rrc |re_est|
+-----+-------------------+
|  210.142.27.137  |  1400.0|  26.0|  
|   210.142.27.202|  2300|  12 |  

expected output like this:

+-----+-----------+-
| index    |gw_mapping|
+-----+------
|  rrc  | {210.142.27.137:1400.0, 210.142.27.202: 2300}|
|  re_est |  {10.142.27.137:26.0, 210.142.27.202:12 }|

What I have done:

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list, concat_ws, lit, map_from_arrays, expr

# Create a SparkSession
spark = SparkSession.builder.appName("DataFramePivot").getOrCreate()

# Your initial DataFrame
data = [("210.142.27.137", 1400.0, 26.0),
        ("210.142.27.202", 2300, 12)]

columns = ["gw", "rrc", "re_est"]

df = spark.createDataFrame(data, columns)

# Pivot the DataFrame and format the output
pivot_df = df.groupBy().agg(
    map_from_arrays(collect_list(lit("gw")), collect_list("rrc")).alias("rrc"),
    map_from_arrays(collect_list(lit("gw")), collect_list("re_est")).alias("re_est")
)

# Create an array with 'rrc' and 're_est' keys
keys_array = lit(["rrc", "re_est"])

# Combine the 'rrc' and 're_est' maps into a single map
combined_map = map_from_arrays(keys_array, array(pivot_df['rrc'], pivot_df['re_est']))

# Explode the combined map into separate rows
result_df = combined_map.selectExpr("explode(map) as (index, gw_mapping)")

# Show the result with the desired formatting
result_df.show(truncate=False)

I am unable to get the output somehow.


Solution

  • Stack the dataframe to convert columns to rows, then group the dataframe by index and collect the list of pairs of gw and corresponding value. Finally use map_from_entries to convert the list of structs/pairs to a map

    result_df = (
        df
        .select('gw', F.expr("stack(2, 'rrc', rrc, 're_est', re_est)  AS (index, value)"))
        .groupby('index')
        .agg(F.expr("map_from_entries(collect_list(struct(gw, value))) as gw_mapping"))
    )
    

    result_df.show()
    +------+----------------------------------------------------+
    |index |gw_mapping                                          |
    +------+----------------------------------------------------+
    |re_est|{210.142.27.137 -> 26.0, 210.142.27.202 -> 12.0}    |
    |rrc   |{210.142.27.137 -> 1400.0, 210.142.27.202 -> 2300.0}|
    +------+----------------------------------------------------+