Search code examples
sqlapache-spark

Not getting desired output on left join in spark scala


I have 2 datasets[ReconEntity] where ReconEntity is :

case class ReconEntity(rowId: String,
                       groupId: String,
                       amounts: List[Amount],
                       processingDate: Long,
                       attributes: Map[String, String],
                       entityType: String,
                       isDuplicate: String)

First Dataset looks like :

     +-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|      false|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+

and the second dataset looks like :

     +-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     +-----+-------+------------------+--------------+----------+-----------+

And I want the resultant to look like this :

     +-----+-------+------------------+--------------+----------+-----------+
     |rowId|groupId|            amount|processingDate|attributes|isDuplicate|
     +-----+-------+------------------+--------------+----------+-----------+
     |   C1|     G1|USD,10.00000000...|    1551021334|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C1|     G1|USD,10.00000000...|    1551011017|  rowId,C1|       true|
     |   C2|     G2|USD,2.000000000...|    1551011017|  rowId,C2|      false|
     |   C3|     G2|USD,6.000000000...|    1551011459|  rowId,C3|      false|
     |   C3|     G2|USD,6.000000000...|    1551011017|  rowId,C3|       true|
     +-----+-------+------------------+--------------+----------+-----------+

I am joining the 2 datasets using left join and if the rowId is not present in second dataset, I am marking the value of isDuplicate flag as true, otherwise the original value in the resultant dataset. The logic is:

inputEntries.as("inputDataset").join(otherEntries.as("otherDataset"),
      col("inputDataset.rowId") === col("otherDataset.rowId"), "left")
      .select(
        col("inputDataset.rowId"),
        col("inputDataset.groupId"),
        col("inputDataset.amounts"),
        col("inputDataset.processingDate"),
        col("inputDataset.attributes"),
        col("inputDataset.entityType"),
        when(
          col("otherDataset.rowId").isNull, TRUE
        ).otherwise(col("inputDataset.isDuplicate")).as(IS_DUPLICATE)
      ).as[ReconEntity]

Here the joinKey is rowId.This logic is working fine in local, but when I am trying to run the spark job, the results are not as expected. I am not very well versed with joins, wanted to know whether my logic for the desired output is correct or not. And what would be the output of left join of the 2 datasets.


Solution

  • For the given requirement, the left join will be the correct choice.

    Can you share the difference you see in the spark-job ouput? The only issue with the current solution will be when you have duplicates in the second dataset (otherEntries). In this case, each row in first dataset will get matched with the duplicate row, creating multiple entries.

    Since we only need to check for rowId column, you can use a deduplicate operation to keep only unique rows in the second dataset to avoid duplicates after the join.

    Below is a working code, with the de-deduplicate included and sample output for the join before and after de-deduplication.

    import org.apache.spark.sql.Dataset
    import spark.implicits._
    
    // Class definitions
    case class Amount(currency: String, value: Double)
    case class ReconEntity(
      rowId: String,
      groupId: String,
      amounts: List[Amount],
      processingDate: Long,
      attributes: Map[String, String],
      entityType: String,
      isDuplicate: String
    )
    
    // First Dataset
    val inputEntries: Dataset[ReconEntity] = Seq(
        ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551021334, Map("rowId" -> "C1"), "E1", "false"),
        ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
        ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
        ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
        ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
        ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
        ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C3"), "E1", "true")
    ).toDS()
    // Second Dataset (includes a duplicate for "C3")
    val otherEntries: Dataset[ReconEntity] = Seq(
        ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
        ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
        ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011460, Map("rowId" -> "C3"), "E1", "false")
    ).toDS()
    
    // Sample Output with left join
    inputEntries.join(otherEntries, Seq("rowId"), "left").show()
    
    // Due to duplicates in second dataset, there are 4 rows for "C3" instead of 2 expected
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
    // |rowId|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551021334|{rowId -> C1}|        E1|      false|   null|                null|          null|         null|      null|       null|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|   null|                null|          null|         null|      null|       null|
    // |   C2|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|     G2|[{USD, 6.0}, {EUR...|    1551011460|{rowId -> C3}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|     G2|[{USD, 6.0}, {EUR...|    1551011460|{rowId -> C3}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
    
    
    // Create a result with de-duplication of second dataset on join key (rowId)
    val result = inputEntries.as("inputDataset").join(
        otherEntries.dropDuplicates("rowId").as("otherDataset"),
        Seq("rowId"), 
        "left"
        ).select(
            col("inputDataset.rowId"),
            col("inputDataset.groupId"),
            col("inputDataset.amounts"),
            col("inputDataset.processingDate"),
            col("inputDataset.attributes"),
            col("inputDataset.entityType"),
            when(
              col("otherDataset.rowId").isNull, lit("true")
            ).otherwise(col("inputDataset.isDuplicate")).as("isDuplicate")
          ).as[ReconEntity]
    
    // Output
    result.show()
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+
    // |rowId|groupId|             amounts|processingDate|   attributes|entityType|isDuplicate|
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551021334|{rowId -> C1}|        E1|       true|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
    // |   C1|     G1|[{USD, 10.0}, {EU...|    1551011017|{rowId -> C1}|        E1|       true|
    // |   C2|     G2|[{USD, 2.0}, {EUR...|    1551011017|{rowId -> C2}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011459|{rowId -> C3}|        E1|      false|
    // |   C3|     G2|[{USD, 6.0}, {EUR...|    1551011017|{rowId -> C3}|        E1|       true|
    // +-----+-------+--------------------+--------------+-------------+----------+-----------+