I have 2 datasets[ReconEntity] where ReconEntity is :
case class ReconEntity(rowId: String,
groupId: String,
amounts: List[Amount],
processingDate: Long,
attributes: Map[String, String],
entityType: String,
isDuplicate: String)
First Dataset looks like :
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C1| G1|USD,10.00000000...| 1551021334| rowId,C1| false|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
| C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
+-----+-------+------------------+--------------+----------+-----------+
and the second dataset looks like :
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
+-----+-------+------------------+--------------+----------+-----------+
And I want the resultant to look like this :
+-----+-------+------------------+--------------+----------+-----------+
|rowId|groupId| amount|processingDate|attributes|isDuplicate|
+-----+-------+------------------+--------------+----------+-----------+
| C1| G1|USD,10.00000000...| 1551021334| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C1| G1|USD,10.00000000...| 1551011017| rowId,C1| true|
| C2| G2|USD,2.000000000...| 1551011017| rowId,C2| false|
| C3| G2|USD,6.000000000...| 1551011459| rowId,C3| false|
| C3| G2|USD,6.000000000...| 1551011017| rowId,C3| true|
+-----+-------+------------------+--------------+----------+-----------+
I am joining the 2 datasets using left join and if the rowId is not present in second dataset, I am marking the value of isDuplicate flag as true, otherwise the original value in the resultant dataset. The logic is:
inputEntries.as("inputDataset").join(otherEntries.as("otherDataset"),
col("inputDataset.rowId") === col("otherDataset.rowId"), "left")
.select(
col("inputDataset.rowId"),
col("inputDataset.groupId"),
col("inputDataset.amounts"),
col("inputDataset.processingDate"),
col("inputDataset.attributes"),
col("inputDataset.entityType"),
when(
col("otherDataset.rowId").isNull, TRUE
).otherwise(col("inputDataset.isDuplicate")).as(IS_DUPLICATE)
).as[ReconEntity]
Here the joinKey is rowId.This logic is working fine in local, but when I am trying to run the spark job, the results are not as expected. I am not very well versed with joins, wanted to know whether my logic for the desired output is correct or not. And what would be the output of left join of the 2 datasets.
For the given requirement, the left join will be the correct choice.
Can you share the difference you see in the spark-job ouput? The only issue with the current solution will be when you have duplicates in the second dataset (otherEntries). In this case, each row in first dataset will get matched with the duplicate row, creating multiple entries.
Since we only need to check for rowId
column, you can use a deduplicate
operation to keep only unique rows in the second dataset to avoid duplicates after the join.
Below is a working code, with the de-deduplicate included and sample output for the join before and after de-deduplication.
import org.apache.spark.sql.Dataset
import spark.implicits._
// Class definitions
case class Amount(currency: String, value: Double)
case class ReconEntity(
rowId: String,
groupId: String,
amounts: List[Amount],
processingDate: Long,
attributes: Map[String, String],
entityType: String,
isDuplicate: String
)
// First Dataset
val inputEntries: Dataset[ReconEntity] = Seq(
ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551021334, Map("rowId" -> "C1"), "E1", "false"),
ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
ReconEntity("C1", "G1", List(Amount("USD", 10.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C1"), "E1", "true"),
ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C3"), "E1", "true")
).toDS()
// Second Dataset (includes a duplicate for "C3")
val otherEntries: Dataset[ReconEntity] = Seq(
ReconEntity("C2", "G2", List(Amount("USD", 2.0), Amount("EUR", 50.0)), 1551011017, Map("rowId" -> "C2"), "E1", "false"),
ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011459, Map("rowId" -> "C3"), "E1", "false"),
ReconEntity("C3", "G2", List(Amount("USD", 6.0), Amount("EUR", 50.0)), 1551011460, Map("rowId" -> "C3"), "E1", "false")
).toDS()
// Sample Output with left join
inputEntries.join(otherEntries, Seq("rowId"), "left").show()
// Due to duplicates in second dataset, there are 4 rows for "C3" instead of 2 expected
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
// |rowId|groupId| amounts|processingDate| attributes|entityType|isDuplicate|groupId| amounts|processingDate| attributes|entityType|isDuplicate|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
// | C1| G1|[{USD, 10.0}, {EU...| 1551021334|{rowId -> C1}| E1| false| null| null| null| null| null| null|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true| null| null| null| null| null| null|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true| null| null| null| null| null| null|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true| null| null| null| null| null| null|
// | C2| G2|[{USD, 2.0}, {EUR...| 1551011017|{rowId -> C2}| E1| false| G2|[{USD, 2.0}, {EUR...| 1551011017|{rowId -> C2}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011459|{rowId -> C3}| E1| false| G2|[{USD, 6.0}, {EUR...| 1551011460|{rowId -> C3}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011459|{rowId -> C3}| E1| false| G2|[{USD, 6.0}, {EUR...| 1551011459|{rowId -> C3}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011017|{rowId -> C3}| E1| true| G2|[{USD, 6.0}, {EUR...| 1551011460|{rowId -> C3}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011017|{rowId -> C3}| E1| true| G2|[{USD, 6.0}, {EUR...| 1551011459|{rowId -> C3}| E1| false|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+-------+--------------------+--------------+-------------+----------+-----------+
// Create a result with de-duplication of second dataset on join key (rowId)
val result = inputEntries.as("inputDataset").join(
otherEntries.dropDuplicates("rowId").as("otherDataset"),
Seq("rowId"),
"left"
).select(
col("inputDataset.rowId"),
col("inputDataset.groupId"),
col("inputDataset.amounts"),
col("inputDataset.processingDate"),
col("inputDataset.attributes"),
col("inputDataset.entityType"),
when(
col("otherDataset.rowId").isNull, lit("true")
).otherwise(col("inputDataset.isDuplicate")).as("isDuplicate")
).as[ReconEntity]
// Output
result.show()
// +-----+-------+--------------------+--------------+-------------+----------+-----------+
// |rowId|groupId| amounts|processingDate| attributes|entityType|isDuplicate|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+
// | C1| G1|[{USD, 10.0}, {EU...| 1551021334|{rowId -> C1}| E1| true|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true|
// | C1| G1|[{USD, 10.0}, {EU...| 1551011017|{rowId -> C1}| E1| true|
// | C2| G2|[{USD, 2.0}, {EUR...| 1551011017|{rowId -> C2}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011459|{rowId -> C3}| E1| false|
// | C3| G2|[{USD, 6.0}, {EUR...| 1551011017|{rowId -> C3}| E1| true|
// +-----+-------+--------------------+--------------+-------------+----------+-----------+