Search code examples
apache-sparkpysparkapache-spark-sqlwindow

How to filter rows based on window and a condition in pyspark?


I need to remove rows where for the same id, p_id and key_id, feedback is missing but we do have some of the feedback present.

input

id p_id key_id feedback
1  p1   k1     happy
1  p1   k1     sad
1  p1   k2     sad
1  p1   k2     
1  p2   k3  
2  p1   k3     sad

output

   id p_id key_id feedback
    1  p1   k1     happy
    1  p1   k1     sad
    1  p1   k2     sad
    1  p2   k3  
    2  p1   k3     sad

How can I achieve that in pyspark?


Solution

  • I'd make a new column called min_length and filter by that column and the feedback column:

    import pyspark.sql.functions as F
    import pyspark.sql.window.Window as W
    
    df = df.withColumn('min_length', 
                       F.min(F.length(F.trim(F.col('feedback'))))
                        .over(W.partitionBy('id', 'p_id', 'key_id'))
                      )
    
    cond = (F.col('min_length') != 0) & (F.length(F.trim(F.col('feedback'))) == 0)
    
    df.filter(~cond)
    

    The trims are just stripping off any spaces in the feedback column