The tranfformation is happening in AWS Glue Spark job. In the example below I group rows by “item_guid” and “item_name” and aggregate “option” column into a collection set. A collection set is an array, however later I will need to map it to a Postgres database and I need to make that array into a string. Thus,
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
will transform options into comma separated strings. However, for the Postgres which column for options has type text[], the string must be enclosed into curly braces and should look like: {90000,86000,81000}
The question: How can I in the latest step of transformation make options value into “{90000,86000,81000}” enclosed string? It seems like a simple trick but I couln't come up with an elegant solution to solve it.
Code Example:
from pyspark.sql.functions import collect_list, collect_set, concat_ws, col, lit
simpleData = [("001","1122","YPIA_PROD",90000),
("002","1122","YPIA_PROD",86000),
("003","1122","YPIA_PROD",81000),
("004","1122","YPIA_ABC",90000),
("005","1133","YPIA_PROD",99000),
("006","1133","YPIA_PROD",83000),
("007","1144","YPIA_PROD",79000),
("008","1144","YPIA_PROD",80000),
("009","1144","YPIA_ABC",91000)
]
rrd = spark.sparkContext.parallelize(simpleData)
df = rrd.toDF(["id","item_guid","item_name","option"])
df.show()
grouped_df = df.groupby("item_guid", "item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")))
grouped_df.show()
array_to_string_df.show()
DF show output:
+---+----------+---------+------+
| id| item_guid|item_name|option|
+---+----------+---------+------+
|001| 1122|YPIA_PROD| 90000|
|002| 1122|YPIA_PROD| 86000|
|003| 1122|YPIA_PROD| 81000|
|004| 1122| YPIA_ABC| 90000|
|005| 1133|YPIA_PROD| 99000|
|006| 1133|YPIA_PROD| 83000|
|007| 1144|YPIA_PROD| 79000|
|008| 1144|YPIA_PROD| 80000|
|009| 1144| YPIA_ABC| 91000|
+---+----------+---------+------+
+----------+---------+--------------------+
| item_guid|item_name| option|
+----------+---------+--------------------+
| 1133|YPIA_PROD| [83000, 99000]|
| 1122|YPIA_PROD|[90000, 86000, 81...|
| 1122| YPIA_ABC| [90000]|
| 1144|YPIA_PROD| [79000, 80000]|
| 1144| YPIA_ABC| [91000]|
+----------+---------+--------------------+
+----------+---------+-----------------+
|item_guid |item_name| option|
+----------+---------+-----------------+
| 1133|YPIA_PROD| 83000,99000|
| 1122|YPIA_PROD|90000,86000,81000|
| 1122| YPIA_ABC| 90000|
| 1144|YPIA_PROD| 79000,80000|
| 1144| YPIA_ABC| 91000|
+----------+---------+-----------------+
from pyspark.sql.functions import collect_list, collect_set,concat, concat_ws,
col, lit
simpleData = [("001","1122","YPIA_PROD",90000),
("002","1122","YPIA_PROD",86000),
("003","1122","YPIA_PROD",81000),
("004","1122","YPIA_ABC",90000),
("005","1133","YPIA_PROD",99000),
("006","1133","YPIA_PROD",83000),
("007","1144","YPIA_PROD",79000),
("008","1144","YPIA_PROD",80000),
("009","1144","YPIA_ABC",91000)
]
schema = ["id","item_guid","item_name","option"]
df = spark.createDataFrame(data=simpleData, schema = schema)
#df.printSchema()
df.show(truncate=False)
grouped_df = df.groupby("item_guid",
"item_name").agg(collect_set("option").alias("option"))
array_to_string_df = grouped_df.withColumn("option", concat_ws(',', col("option")) ).select(col("item_guid"), col("item_name"), concat(lit("{"), col("option"), lit("}")).alias("option"))
array_to_string_df.show()