Search code examples
dataframepysparkgroup-bycollect

Combine text from multiple rows based on IDs but remove consecutive duplicate entry


I have a dataframe

id col1
1 aa
3 uy
1 bb
1 cr
1 cr
1 cr
1 qe
2 yt
2 yt
3 uy
4 po
1 cr

I was able to combine them like this using collect_Set method

df  = df.groupby("id").agg(f.concat_ws(", ", f.collect_list(df.col1)).alias('col1'))

id col1
1 aa,bb,cr,cr,cr,qe,cr
2 yt,yt
3 uy,uy
4 po

But I want my final output to drop consecutive duplicate items , something like this

id col1
1 aa,bb,cr,qe,cr
2 yt
3 uy
4 po

Solution

  • w=Window.partitionBy('id')
    
    df= (df.withColumn('index',  monotonically_increasing_id())#Create an index to orderBy
            .withColumn('index',lag('col1').over(w.orderBy('index'))).na.fill('')#Create a column to use in filter
           .where(col('col1')!=col('index')).drop('index')#filter
           .groupBy('id').agg(array_join(collect_list('col1'),',').alias('col1'))#groupby, collect_list and the array_join the outcome
         .show())
    
    +---+--------------+
    | id|          col1|
    +---+--------------+
    |  1|aa,bb,cr,qe,cr|
    |  2|            yt|
    |  3|            uy|
    |  4|            po|
    +---+--------------+