Search code examples
scalaapache-spark

How to do not null check for columns in scala dataframe in an effective way?


I have a dataframe. I want to raise an exception if any of the column is having null. I can do it by hard coding column names and putting or conditions to check. How can i make it dynamic and performance effective.


Solution

  • There are several approaches to accomplish this task, and the following is one solution using the RAISE_ERROR function.

    df.show(false)
    +---+----+
    |id |name|
    +---+----+
    |1  |A   |
    |2  |B   |
    |3  |NULL|
    +---+----+
    
    val checkNullExprs = df
    .columns
    .map{ c => 
        when(
            col(c).isNull, 
            raise_error(lit(s"[${c}] has null"))
        ) 
        .otherwise(col(c)).as(c)
    }
    
    df
    .select(checkNullExprs:_*)
    .show(false)
    
    java.lang.RuntimeException: [ name IS NULL ]
      at org.apache.spark.sql.catalyst.expressions.RaiseError.eval(misc.scala:90)
      at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:543)
      at org.apache.spark.sql.catalyst.expressions.If.eval(conditionalExpressions.scala:90)
      at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:158)
      at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:89)
      at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$47.$anonfun$applyOrElse$82(Optimizer.scala:2149)
      at scala.collection.immutable.List.map(List.scala:297)
      at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$47.applyOrElse(Optimizer.scala:2149)
      at org.a
    

    OR

    df.show(false)
    
    +----+----+
    |id  |name|
    +----+----+
    |1   |A   |
    |NULL|B   |
    |3   |NULL|
    +----+----+
    
    val messageExpr = concat(
       lit("[ "), 
       concat_ws(", ", filter(array(df.columns.map(c => expr(s"""IF(COUNT(${c}) != COUNT(*), "`${c}` has nulls", NULL)""").as(c)):_*), f => f.isNotNull)), 
       lit(" ]")
    ).as("message")
    
    OR
    
    val arrayExpr = array(
        df
        .columns
        .map(c => when(
            count(col(c)) =!= count($"*"), 
            lit(s"`${c}` has nulls")
         ).as(c)):_*
    )
    val messageExpr = concat(
        lit("[ "), 
        concat_ws(
            ", ", 
            filter(
                arrayExpr,
                f => f.isNotNull
            )
        ), 
        lit(" ]")
    ).as("message")
    
    
    df
    .select(messageExpr)
    .withColumn("message", expr("IF(LENGTH(message) > 2, RAISE_ERROR(message), CAST(NULL AS STRING))"))
    .select("message")
    .show(false)
    
    java.lang.RuntimeException: [ `id` has nulls, `name` has nulls ]
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.project_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)