Search code examples
scalaapache-sparkapache-spark-sqlapache-spark-dataset

Spark dataset failed to resolve column after multiple join


Suppose I have these case class

case class Employee(id: Long, proj_id: Long, office_id: Long, salary: Long)

case class Renumeration(id: Long, amount: Long)

I'd like to update a collection of Employee based on Renumeration using Spark

val right: Dataset[Renumeration]  = ???
val left: Dataset[Employee] = ???

left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
.map { case(left,right) => updateProj(left,right) }
.joinWith(broadcast(right),left("office_id") === right("id"),"leftouter")
.map { case(left,right) => updateOffice(left,right) }

def updateProj(emp: Employee; ren: Renumeration): Employee = //business logic
def updateOffice(emp: Employee; ren: Renumeration): Employee = //business logic

The first join and map works, however when I introduce the second join Spark failed to resolve the id column and showed these instead.

org.apache.spark.sql.AnalysisException: Resolved attribute(s) office_id#42L missing from id#114L,salary#117L,id#34L,amount#35L,proj_id#115L,office_id#116L in operator !Join LeftOuter, (office_id#42L = id#34L). Attribute(s) with the same name appear in the operation: office_id. Please check if the right attribute(s) are used.;;
!Join LeftOuter, (office_id#42L = id#34L)
:- SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]
:  +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StructType(StructField(id,LongType,false), StructField(proj_id,LongType,false), StructField(office_id,LongType,false), StructField(salary,LongType,false)),true), StructField(_2,StructType(StructField(id,LongType,false), StructField(amount,LongType,false)),true)], obj#113: Employee
:     +- DeserializeToObject newInstance(class scala.Tuple2), obj#112: scala.Tuple2
:        +- Join LeftOuter, (_1#103.proj_id = _2#104.id)
:           :- Project [named_struct(id, id#40L, proj_id, proj_id#41L, office_id, office_id#42L, salary, salary#43L) AS _1#103]
:           :  +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]
:           +- Project [named_struct(id, id#34L, amount, amount#35L) AS _2#104]
:              +- ResolvedHint (broadcast)
:                 +- LocalRelation <empty>, [id#34L, amount#35L]
+- ResolvedHint (broadcast)
   +- LocalRelation <empty>, [id#34L, amount#35L]

Any idea why Spark could not resolve the column even though I already used the typed Dataset? Also what should I do to make this work if possible?


Solution

  • The error is being caused because the reference returned by left("office_id") no longer exists in the new projected dataset(i.e. the dataset resulting from the first join and map operation).

    If you look closer at the execution plan in the nested relation

    : +- LocalRelation <empty>, [id#40L, proj_id#41L, office_id#42L, salary#43L]

    you can observe that the reference to office_id in the left dataset is office_id#42L. However, if you look at the later execution, you will notice that this reference no longer exists in the projection

    SerializeFromObject [assertnotnull(assertnotnull(input[0, Employee, true])).id AS id#114L, assertnotnull(assertnotnull(input[0, Employee, true])).proj_id AS proj_id#115L, assertnotnull(assertnotnull(input[0, Employee, true])).office_id AS office_id#116L, assertnotnull(assertnotnull(input[0, Employee, true])).salary AS salary#117L]

    as the office_id reference available is office_id#116L.

    In order to resolve this, you could use intermediary/temporary variables eg:

    val right: Dataset[Renumeration]  = ???
    val left: Dataset[Employee] = ???
    
    val leftTemp = left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
    .map { case(left,right) => updateProj(left,right) }
    
    val leftFinal = leftTemp.joinWith(broadcast(right),leftTemp("office_id") === right("id"),"leftouter")
    .map { case(left,right) => updateOffice(left,right) }
    

    or you could try using the following shorthand $"office_id" === right("id") in your join eg

    left.joinWith(broadcast(right),left("proj_id") === right("id"),"leftouter")
    .map { case(left,right) => updateProj(left,right) }
    .joinWith(broadcast(right),$"office_id" === right("id"),"leftouter")
    .map { case(left,right) => updateOffice(left,right) }
    

    Let me know if this works for you.