Search code examples
pythonapache-sparkaws-glue

GroupBy Spark Dataframe and manipulate aggregated data as string


The tranfformation is happening in AWS Glue Spark job. In the example below I group rows by “item_guid” and “item_name” and aggregate “option” column into a collection set. A collection set is an array, however later I will need to map it to a Postgres database and I need to make that array into a string. Thus,

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option"))) 

will transform options into comma separated strings. However, for the Postgres which column for options has type text[], the string must be enclosed into curly braces and should look like: {90000,86000,81000}

The question: How can I in the latest step of transformation make options value into “{90000,86000,81000}” enclosed string? It seems like a simple trick but I couln't come up with an elegant solution to solve it.

Code Example:

from pyspark.sql.functions import collect_list, collect_set, concat_ws, col, lit

simpleData = [("001","1122","YPIA_PROD",90000),
    ("002","1122","YPIA_PROD",86000),
    ("003","1122","YPIA_PROD",81000),
    ("004","1122","YPIA_ABC",90000),
    ("005","1133","YPIA_PROD",99000),
    ("006","1133","YPIA_PROD",83000),
    ("007","1144","YPIA_PROD",79000),
    ("008","1144","YPIA_PROD",80000),
    ("009","1144","YPIA_ABC",91000)
]

rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()

grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))

array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()

DF show output:

+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001|      1122|YPIA_PROD| 90000|
|002|      1122|YPIA_PROD| 86000|
|003|      1122|YPIA_PROD| 81000|
|004|      1122| YPIA_ABC| 90000|
|005|      1133|YPIA_PROD| 99000|
|006|      1133|YPIA_PROD| 83000|
|007|      1144|YPIA_PROD| 79000|
|008|      1144|YPIA_PROD| 80000|
|009|      1144| YPIA_ABC| 91000|
+---+----------+---------+------+

+----------+---------+--------------------+
| item_guid|item_name|              option|
+----------+---------+--------------------+
|      1133|YPIA_PROD|      [83000, 99000]|
|      1122|YPIA_PROD|[90000, 86000, 81...|
|      1122| YPIA_ABC|             [90000]|
|      1144|YPIA_PROD|      [79000, 80000]|
|      1144| YPIA_ABC|             [91000]|
+----------+---------+--------------------+

+----------+---------+-----------------+
|item_guid |item_name|           option|
+----------+---------+-----------------+
|      1133|YPIA_PROD|      83000,99000|
|      1122|YPIA_PROD|90000,86000,81000|
|      1122| YPIA_ABC|            90000|
|      1144|YPIA_PROD|      79000,80000|
|      1144| YPIA_ABC|            91000|
+----------+---------+-----------------+

Solution

  • from pyspark.sql.functions import collect_list, collect_set,concat, concat_ws, 
    col, lit 
    
    simpleData = [("001","1122","YPIA_PROD",90000),
        ("002","1122","YPIA_PROD",86000),
        ("003","1122","YPIA_PROD",81000),
        ("004","1122","YPIA_ABC",90000),
        ("005","1133","YPIA_PROD",99000),
        ("006","1133","YPIA_PROD",83000),
        ("007","1144","YPIA_PROD",79000),
        ("008","1144","YPIA_PROD",80000),
        ("009","1144","YPIA_ABC",91000)
    ]
    
    schema = ["id","item_guid","item_name","option"]
    df = spark.createDataFrame(data=simpleData, schema = schema)
    #df.printSchema()
    df.show(truncate=False)
    grouped_df = df.groupby("item_guid", 
    "item_name").agg(collect_set("option").alias("option"))
    array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")) ).select(col("item_guid"), col("item_name"), concat(lit("{"), col("option"), lit("}")).alias("option"))   
    
    array_to_string_df.show()