Search code examples
pythonarraysapache-sparkpysparknull

Removing null values from array after merging double-type columns


I have this PySpark df:

+---------+----+----+----+----+----+----+----+----+----+                        
|partition|   1|   2|   3|   4|   5|   6|   7|   8|   9|
+---------+----+----+----+----+----+----+----+----+----+
|        7|null|null|null|null|null|null| 0.7|null|null|
|        1| 0.2| 0.1| 0.3|null|null|null|null|null|null|
|        8|null|null|null|null|null|null|null| 0.8|null|
|        4|null|null|null| 0.4| 0.5| 0.6|null|null| 0.9|
+---------+----+----+----+----+----+----+----+----+----+

from which I have combined the 9 right columns:

+---------+--------------------+                                                
|partition|            vec_comb|
+---------+--------------------+
|        7|      [,,,,,,,, 0.7]|
|        1|[,,,,,, 0.1, 0.2,...|
|        8|      [,,,,,,,, 0.8]|
|        4|[,,,,, 0.4, 0.5, ...|
+---------+--------------------+

How can I remove the NullTypes from the arrays of vec_comb column?

Expected output:

+---------+--------------------+                                                
|partition|            vec_comb|
+---------+--------------------+
|        7|               [0.7]|
|        1|      [0.1, 0.2,0.3]|
|        8|               [0.8]|
|        4|[0.4, 0.5, 0.6, 0,9]|
+---------+--------------------+

I've tried (obviously wrong, but I can't wrap my head around this):

def clean_vec(array):
    new_Array = []
    for element in array:
        if type(element) == FloatType():
            new_Array.append(element)
    return new_Array

udf_clean_vec = F.udf(f=(lambda c: clean_vec(c)), returnType=ArrayType(FloatType()))
df = df.withColumn('vec_comb_cleaned', udf_clean_vec('vec_comb'))

Solution

  • You can use higher-order function filter to remove null elements:

    import pyspark.sql.functions as F
    
    df2 = df.withColumn('vec_comb_cleaned', F.expr('filter(vec_comb, x -> x is not null)'))
    
    df2.show()
    +---------+--------------------+--------------------+
    |partition|            vec_comb|    vec_comb_cleaned|
    +---------+--------------------+--------------------+
    |        7|      [,,,,,, 0.7,,]|               [0.7]|
    |        1|[0.2, 0.1, 0.3,,,...|     [0.2, 0.1, 0.3]|
    |        8|      [,,,,,,, 0.8,]|               [0.8]|
    |        4|[,,, 0.4, 0.5, 0....|[0.4, 0.5, 0.6, 0.9]|
    +---------+--------------------+--------------------+
    

    You can use a UDF, but it will be slower, e.g.

    udf_clean_vec = F.udf(lambda x: [i for i in x if i is not None], 'array<float>')
    df2 = df.withColumn('vec_comb_cleaned', udf_clean_vec('vec_comb'))