Search code examples
scalaapache-sparkjoinleft-joininner-join

An Apache Spark Join including null keys


My objectifs is to join two dataframes, have infomations from both, despite the fact that I can have nulls in my join keys. These are my two dataframes :

val data1 = Seq(
  (601, null, null, "8121000868-10", "CN88"),
  (3925, null, null, "8121000936-50",   "CN88")
)

val df1 = data1.toDF("id", "work_order_number", "work_order_item_number", "tally_number", "company_code")

val data2 = Seq(
  (null, null, "8121000868-10", "CN88", "popo"),
  (null, null, "8121000936-50", "CN88", "Smith")
)

val df2 = data2.toDF("work_order_number", "work_order_item_number", "tally_number", "company_code", "name")

Actually my objectif is to get the "id" from df1, rename it as "tally_summary_id" and be able to to re-attach some other informations to every single id. This is my code :

val final_df =
  df1.select(col("id").alias("tally_summary_id"), col("work_order_number"), col("work_order_item_number"),
    col("tally_number"), col("company_code"))
  .join(df2, Seq("tally_number", "work_order_number", "work_order_item_number", "company_code"), "full")

A left join give me :

+-------------+-----------------+----------------------+------------+----------------+----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id|name|
+-------------+-----------------+----------------------+------------+----------------+----+
|8121000868-10|             null|                  null|        CN88|             601|null|
|8121000936-50|             null|                  null|        CN88|            3925|null|
+-------------+-----------------+----------------------+------------+----------------+----+

A right join give me :

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|            null| popo|
|8121000936-50|             null|                  null|        CN88|            null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

A full join give me :

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|             601| null|
|8121000868-10|             null|                  null|        CN88|            null| popo|
|8121000936-50|             null|                  null|        CN88|            3925| null|
|8121000936-50|             null|                  null|        CN88|            null|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

What can i do to have something like this :

+-------------+-----------------+----------------------+------------+----------------+-----+
| tally_number|work_order_number|work_order_item_number|company_code|tally_summary_id| name|
+-------------+-----------------+----------------------+------------+----------------+-----+
|8121000868-10|             null|                  null|        CN88|             601|popo |
|8121000936-50|             null|                  null|        CN88|            3925|Smith|
+-------------+-----------------+----------------------+------------+----------------+-----+

Solution

  • You can use the <=> equality operator which is null safe as shown here.

    I added a schema to the dataframe creation as it seemed that without it the auto schema inference didn't give a type to the columns with only nulls and the join failed.

    The resulting dataframe is exactly the one you wanted

    import scala.collection.JavaConversions._
    
    val data1 = Seq(
      Row(601, null, null, "8121000868-10", "CN88"),
      Row(3925, null, null, "8121000936-50", "CN88")
    )
    
    val schema1 = StructType(List(
      StructField("id", IntegerType, false),
      StructField("work_order_number", StringType, true),
      StructField("work_order_item_number", StringType, true),
      StructField("tally_number", StringType, true),
      StructField("company_code", StringType, true)
    ))
    
    val df1 = sparkSession.createDataFrame(data1, schema1)
    
    val data2 = Seq(
      Row(null, null, "8121000868-10", "CN88", "popo"),
      Row(null, null, "8121000936-50", "CN88", "Smith")
    )
    
    val schema2 = StructType(Seq(
      StructField("work_order_number", StringType, true),
      StructField("work_order_item_number", StringType, true),
      StructField("tally_number", StringType, true),
      StructField("company_code", StringType, true),
      StructField("name", StringType, false)
    ))
    
    val df2 = sparkSession.createDataFrame(data2, schema2)
    
    
    val final_df =
      df1.join(df2, df1("tally_number") <=> df2("tally_number")
          && df1("work_order_number") <=> df2("work_order_number")
          && df1("work_order_item_number") <=> df2("work_order_item_number")
          && df1("company_code") <=> df2("company_code")
          , "inner")
        .select(df1("tally_number"),
          df1("work_order_number"),
          df1("work_order_item_number"),
          df1("company_code"),
          df1("id").as("tally_summary_id"),
          df2("name"))