Search code examples
javaapache-sparknullapache-spark-sql

Spark sql how to explode without losing null values


I have a Dataframe that I am trying to flatten. As part of the process, I want to explode it, so if I have a column of arrays, each value of the array will be used to create a separate row. For instance,

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]

should become

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

This is my code

private DataFrame explodeDataFrame(DataFrame df) {
    DataFrame resultDf = df;
    for (StructField field : df.schema().fields()) {
        if (field.dataType() instanceof ArrayType) {
            resultDf = resultDf.withColumn(field.name(), org.apache.spark.sql.functions.explode(resultDf.col(field.name())));
            resultDf.show();
        }
    }
    return resultDf;
}

The problem is that in my data, some of the array columns have nulls. In that case, the entire row is deleted. So this dataframe:

id | name | likes
_______________________________
1  | Luke | [baseball, soccer]
2  | Lucy | null

becomes

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer

instead of

id | name | likes
_______________________________
1  | Luke | baseball
1  | Luke | soccer
2  | Lucy | null

How can I explode my arrays so that I don't lose the null rows?

I am using Spark 1.5.2 and Java 8


Solution

  • Spark 2.2+

    You can use explode_outer function:

    import org.apache.spark.sql.functions.explode_outer
    
    df.withColumn("likes", explode_outer($"likes")).show
    
    // +---+----+--------+
    // | id|name|   likes|
    // +---+----+--------+
    // |  1|Luke|baseball|
    // |  1|Luke|  soccer|
    // |  2|Lucy|    null|
    // +---+----+--------+
    

    Spark <= 2.1

    In Scala but Java equivalent should be almost identical (to import individual functions use import static).

    import org.apache.spark.sql.functions.{array, col, explode, lit, when}
    
    val df = Seq(
      (1, "Luke", Some(Array("baseball", "soccer"))),
      (2, "Lucy", None)
    ).toDF("id", "name", "likes")
    
    df.withColumn("likes", explode(
      when(col("likes").isNotNull, col("likes"))
        // If null explode an array<string> with a single null
        .otherwise(array(lit(null).cast("string")))))
    

    The idea here is basically to replace NULL with an array(NULL) of a desired type. For complex type (a.k.a structs) you have to provide full schema:

    val dfStruct = Seq((1L, Some(Array((1, "a")))), (2L, None)).toDF("x", "y")
    
    val st =  StructType(Seq(
      StructField("_1", IntegerType, false), StructField("_2", StringType, true)
    ))
    
    dfStruct.withColumn("y", explode(
      when(col("y").isNotNull, col("y"))
        .otherwise(array(lit(null).cast(st)))))
    

    or

    dfStruct.withColumn("y", explode(
      when(col("y").isNotNull, col("y"))
        .otherwise(array(lit(null).cast("struct<_1:int,_2:string>")))))
    

    Note:

    If array Column has been created with containsNull set to false you should change this first (tested with Spark 2.1):

    df.withColumn("array_column", $"array_column".cast(ArrayType(SomeType, true)))