Search code examples
dataframescalaapache-sparkapache-spark-sqltypeerror

Spark/Scala - Join resultset giving type mismatch error while performing withColumn operation; found : org.apache.spark.sql.Column required: Boolean


I have two dataframes that I am trying to join and based on the joined set trying to assign a flag column.

demandDF1

+--------+-----------+---------+
|rgn_nm  |file_crt_dt|file_vrsn|
+--------+-----------+---------+
|DAO     |2022-06-30 |1        |
|DAO     |2022-06-30 |1        |
|CCC     |2022-06-30 |1        |
|APCC    |2022-06-30 |1        |
|ODM     |2022-06-29 |3        |
|EMF     |2022-06-30 |1        |
|T2Region|2022-06-29 |4        |
|BCC     |2022-06-30 |1        |
|EMF     |2022-07-01 |1        |
+--------+-----------+---------+

outputDistinctDF

+------+-----------+---------+
|region|file_crt_dt|file_vrsn|
+------+-----------+---------+
|DAO   |2022-06-30 |1        |
|CCC   |2022-06-29 |1        |
|APCC  |2022-06-30 |1        |
|ODM   |2022-06-29 |2        |
|EMF   |2022-06-30 |1        |
|BCC   |2022-06-30 |1        |
+------+-----------+---------+

I am trying to achieve something like this below.

+------------+-----------------+---------------+-------------+------------------+----------------+----+
|input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
+------------+-----------------+---------------+-------------+------------------+----------------+----+
|DAO         |2022-06-30       |1              |DAO          |2022-06-30        |1               |0   |
|CCC         |2022-06-30       |1              |CCC          |2022-06-29        |1               |1   |
|T2Region    |2022-06-29       |4              |null         |null              |null            |1   |
|ODM         |2022-06-29       |3              |ODM          |2022-06-29        |2               |1   |
|APCC        |2022-06-30       |1              |APCC         |2022-06-30        |1               |0   |
|EMF         |2022-07-01       |1              |EMF          |2022-06-30        |1               |1   |
|EMF         |2022-06-30       |1              |EMF          |2022-06-30        |1               |0   |
|BCC         |2022-06-30       |1              |BCC          |2022-06-30        |1               |0   |
+------------+-----------------+---------------+-------------+------------------+----------------+----+

The logic:

(input_file_crt_dt > output_file_crt_dt ) or
(input_file_crt_dt = output_file_crt_dt and input_file_vrsn > output_file_vrsn) or
(output_region is null)

then flag = 1 else 0

I have tried the following pseudo code but it ends up giving the error: enter image description here

Steps that I have followed:

val demandDF1 = Seq(("DAO","2022-06-30","1"),
("DAO","2022-06-30","1"),
("CCC","2022-06-30","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","3"),
("EMF","2022-06-30","1"),
("T2Region","2022-06-29","4"),
("BCC","2022-06-30","1"),
("EMF","2022-07-01","1")).toDF("rgn_nm","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val outputDistinctDF = Seq(("DAO","2022-06-30","1"),
("CCC","2022-06-29","1"),
("APCC","2022-06-30","1"),
("ODM","2022-06-29","2"),
("EMF","2022-06-30","1"),
("BCC","2022-06-30","1")).toDF("region","file_crt_dt","file_vrsn").withColumn("file_crt_dt", col("file_crt_dt").cast("date")).withColumn("file_vrsn", col("file_vrsn").cast("int"))

val inputDistinctDF = demandDF1.select(col("rgn_nm"), col("file_crt_dt"), col("file_vrsn")).distinct()

val resultantDF = inputDistinctDF.join(outputDistinctDF,
  inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region")
  , "left_outer").select(inputDistinctDF.col("rgn_nm") as "input_region",
  inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
  inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
  outputDistinctDF.col("region") as "output_region",
  outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
  outputDistinctDF.col("file_vrsn") as "output_file_vrsn"
).withColumn("flag",
  when((
    col("output_region").isNull || col("input_file_crt_dt").gt(col("output_file_crt_dt")) || ( col("input_file_crt_dt").eq(col("output_file_crt_dt")) && col("input_file_vrsn").gt(col("output_file_vrsn")) )
    ), lit("1")).otherwise(lit("0")))

Solution

  • I've carefully read your conditions and created a separate variable for it in case something went wrong. But everything went well.

    val cond = (
      ($"input_file_crt_dt" > $"output_file_crt_dt") ||
      (($"input_file_crt_dt" === $"output_file_crt_dt") &&
       ($"input_file_vrsn" > $"output_file_vrsn")) ||
      $"output_region".isNull
    )
    
    val resultantDF = inputDistinctDF
      .join(
        outputDistinctDF,
        inputDistinctDF.col("rgn_nm") === outputDistinctDF.col("region"),
        "left_outer")
      .select(
        inputDistinctDF.col("rgn_nm") as "input_region",
        inputDistinctDF.col("file_crt_dt") as "input_file_crt_dt",
        inputDistinctDF.col("file_vrsn") as "input_file_vrsn",
        outputDistinctDF.col("region") as "output_region",
        outputDistinctDF.col("file_crt_dt") as "output_file_crt_dt",
        outputDistinctDF.col("file_vrsn") as "output_file_vrsn")
      .withColumn("flag", when(cond, "1").otherwise("0"))
    
    resultantDF.show()
    // +------------+-----------------+---------------+-------------+------------------+----------------+----+
    // |input_region|input_file_crt_dt|input_file_vrsn|output_region|output_file_crt_dt|output_file_vrsn|flag|
    // +------------+-----------------+---------------+-------------+------------------+----------------+----+
    // |         BCC|       2022-06-30|              1|          BCC|        2022-06-30|               1|   0|
    // |        APCC|       2022-06-30|              1|         APCC|        2022-06-30|               1|   0|
    // |    T2Region|       2022-06-29|              4|         null|              null|            null|   1|
    // |         EMF|       2022-06-30|              1|          EMF|        2022-06-30|               1|   0|
    // |         ODM|       2022-06-29|              3|          ODM|        2022-06-29|               2|   1|
    // |         CCC|       2022-06-30|              1|          CCC|        2022-06-29|               1|   1|
    // |         EMF|       2022-07-01|              1|          EMF|        2022-06-30|               1|   1|
    // |         DAO|       2022-06-30|              1|          DAO|        2022-06-30|               1|   0|
    // +------------+-----------------+---------------+-------------+------------------+----------------+----+