Search code examples
pythonapache-sparkpysparkapache-spark-sqlapache-spark-ml

Spark: Counting NaNs in a vector column


Suppose I have a Spark DataFrame that looks like this

+-----------+------------------+
| id        |          features|
+-----------+------------------+
|          1|[57.0,1.0,0.0,0.0]|
|          2|[63.0,NaN,0.0,0.0]|
|          3|[74.0,1.0,3.0,NaN]|
|          4|[67.0,NaN,0.0,0.0]|
|          5|[NaN,1.0,NaN,NaN] |

where each row in the features column is a DenseVector containing a combination of float and NaN datatypes. Is there a way to count the number of NaN in the first column of the DenseVector or any arbitrary column? For instance, I would like something that would return that the first column has 1 NaN, the 2nd has 3, and the 4th has 2.


Solution

  • As far as I know Spark SQL doesn't provide method like this but it is trivial with RDD and a little bit of NumPy.

    from pyspark.ml.linalg import DenseVector, Vector
    import numpy as np
    
    df = sc.parallelize([
        (1, DenseVector([57.0, 1.0, 0.0, 0.0])),
        (2, DenseVector([63.0, float("NaN"), 0.0, 0.0])),
        (3, DenseVector([74.0, 1.0, 3.0, float("NaN")])),
        (4, DenseVector([67.0, float("NaN"), 0.0, 0.0])),
        (5, DenseVector([float("NaN"), 1.0, float("NaN"), float("NaN")])),
    ]).toDF(["id", "features"])
    
    (df
        .select("features")
        .rdd
        .map(lambda x: np.isnan(x.features.array))
        .sum())
    
    array([1, 2, 1, 2])
    

    You could a similar thing with SQL but it requires significantly more effort. A helper function:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType, DoubleType
    from pyspark.sql import Column
    from typing import List
    
    def as_array(c: Column) -> Column:
        def as_array_(v: Vector) -> List[float]:
            return v.array.tolist()
        return udf(as_array_, ArrayType(DoubleType()))(c)
    

    Determine size of the vectors:

    from pyspark.sql.functions import col, size
    
    (vlen, ) = df.na.drop().select(size(as_array(col("features")))).first()
    

    Create an expression:

    from pyspark.sql.functions import col, isnan, sum as sum_
    
    feature_array = as_array("features").alias("features")
    

    Finally select:

    (df
        .na.drop(subset=["features"])
        .select([sum_(isnan(feature_array[i]).cast("bigint")) for i in range(vlen)]))