In the below sample dataset, I have two groups 'A' and 'B'. The 'Description' column contains the details associated with group 'A' and whenever an instance of group 'B' occurs, I need to add the description details associated with prior instances of group 'A' in an array and put it against group 'B' in a new dataset.
Sample dataset:
Description | Group |
---|---|
XYZ | A |
PQR | A |
B | |
DEF | A |
HIJ | A |
KLM | A |
NOP | A |
B |
Expected Output:
Group | Description |
---|---|
B | [XYZ,PQR] |
B | [DEF,HIJ,KLM,NOP] |
Suppose you have column id
, which determines the order of rows.
Calculate group number as running count of group B occurrences, then aggregate using collect_list, see the code. It is scala, but the same spark.sql will work in pyspark:
println("Initial data:")
val df1 = Seq(
(1, "XYZ", "A"),
(2, "PQR" , "A"),
(3,null, "B" ),
(4,"DEF", "A"),
(5,"HIJ", "A"),
(6,"KLM", "A"),
(7,"NOP", "A"),
(8,null, "B" )
).toDF("Id","Description", "Group")
df1.createOrReplaceTempView("df1")
df1.show(100, false)
println("Result:")
spark.sql("""
select 'B' Group, collect_list(Description) Description
from
(
select id, Description, Group,
--calculate group number
count(case when Group='B' then 1 else null end) over(order by id) as grp_num
from df1
) s
group by grp_num
having size(collect_list(Description))>0
order by grp_num
""").show(100, false)
Initial data:
+---+-----------+-----+
|Id |Description|Group|
+---+-----------+-----+
|1 |XYZ |A |
|2 |PQR |A |
|3 |null |B |
|4 |DEF |A |
|5 |HIJ |A |
|6 |KLM |A |
|7 |NOP |A |
|8 |null |B |
+---+-----------+-----+
Result:
+-----+--------------------+
|Group|Description |
+-----+--------------------+
|B |[XYZ, PQR] |
|B |[DEF, HIJ, KLM, NOP]|
+-----+--------------------+