Search code examples
scalaapache-sparkjoindatasetrdd

Scala join different datasets to get value for one column


I am new to Scala. I now have 3 tables.

A:

Marketplace Level Band
US LEVEL_1
CA LEVEL_1 BAND_1

B:

Marketplace Level Value
US LEVEL_1 10

C:

Marketplace Level Band Value
CA LEVEL_1 BAND_1 20

I would want to:

For rows with marketplace = US in table A -> join table B on Seq(Marketplace, Level) to get the Value;

For rows with marketplace = CA in table A -> join table C on Seq(Marketplace, Level, Band) to get the Value.

The output table will be like:

Marketplace Level Band Value
US LEVEL_1 10
CA LEVEL_1 BAND_1 20

How should I write Scala code to achieve this? Thanks!


Solution

  • import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions.{coalesce, col}
    import spark.implicits._
    val A = Seq(("US", "LEVEL_1", ""), ("CA", "LEVEL_1", "BAND_1"))
      .toDF("Marketplace", "Level", "Band")
    val B = Seq(("US", "LEVEL_1", 10)).toDF("Marketplace", "Level", "Value")
    val C = Seq(("CA", "LEVEL_1", "BAND_1", 20)).toDF(
      "Marketplace",
      "Level",
      "Band",
      "Value"
    )
    
    val res = A
      .join(B, A.col("Marketplace") === B.col("Marketplace"), "left")
      .join(C, A.col("Marketplace") === C.col("Marketplace"), "left")
      .select(
        A.col("Marketplace").alias("Marketplace"),
        A.col("Level").alias("Level"),
        C.col("Band").alias("Band"),
        coalesce(B.col("Value"), C.col("Value")).alias("Value")
      )
    
    res.show(false)
    //    +-----------+-------+------+-----+
    //    |Marketplace|Level  |Band  |Value|
    //    +-----------+-------+------+-----+
    //    |US         |LEVEL_1|null  |10   |
    //    |CA         |LEVEL_1|BAND_1|20   |
    //    +-----------+-------+------+-----+