Search code examples
apache-sparkjoinapache-spark-sqlrdd

efficiently get joined and not joined data of a dataframe against other dataframe


I have two dataframes lets say A and B. They have different schemas.

I want to get records from dataframe A which joins with B on a key and the records which didn't get joined, I want those as well.

Can this be done in a single query? Since going over the same data twice will reduce the performance. The DataFrame A is much bigger in size than B. Dataframe B's size will be around 50Gb-100gb. Hence I can't broadcast B in that case.

I am okay with getting a single Dataframe C as a result, which can have a partition column "Joined" with values "Yes" or "No", signifying whether the data in A got joined or not with B.

What in case if A has duplicates? and I don't want them. I was thinking that I'll do a recudeByKey later on the C dataframe. Any suggestions around that?

I am using hive tables to store the Data in ORC file format on HDFS. Writing code in scala.


Solution

  • Yes, you just need to do a left-outer join:

    import sqlContext.implicits._
    
    val A = sc.parallelize(List(("id1", 1234),("id1", 1234),("id3", 5678))).toDF("id1", "number")
    val B = sc.parallelize(List(("id1", "Hello"),("id2", "world"))).toDF("id2", "text")
    
    val joined = udf((id: String) => id match {
      case null => "No"
      case _ => "Yes"
    })
    
    val C = A
      .distinct
      .join(B, 'id1 === 'id2, "left_outer")
      .withColumn("joined",joined('id2))
      .drop('id2)
      .drop('text)
    

    This will yield a dataframe C:[id1: string, number: int, joined: string] that looks like this:

    [id1,1234,Yes]
    [id3,5678,No]
    

    Note that I have added a distinct to filter out duplicates in A and that the last column in C refers to wether or not is was joined.

    EDIT: Following remark from OP, I have added the drop lines to remove the columns from B.