I have two dataframes lets say A and B. They have different schemas.
I want to get records from dataframe A which joins with B on a key and the records which didn't get joined, I want those as well.
Can this be done in a single query? Since going over the same data twice will reduce the performance. The DataFrame A is much bigger in size than B. Dataframe B's size will be around 50Gb-100gb. Hence I can't broadcast B in that case.
I am okay with getting a single Dataframe C as a result, which can have a partition column "Joined" with values "Yes" or "No", signifying whether the data in A got joined or not with B.
What in case if A has duplicates? and I don't want them. I was thinking that I'll do a recudeByKey later on the C dataframe. Any suggestions around that?
I am using hive tables to store the Data in ORC file format on HDFS. Writing code in scala.
Yes, you just need to do a left-outer join:
import sqlContext.implicits._
val A = sc.parallelize(List(("id1", 1234),("id1", 1234),("id3", 5678))).toDF("id1", "number")
val B = sc.parallelize(List(("id1", "Hello"),("id2", "world"))).toDF("id2", "text")
val joined = udf((id: String) => id match {
case null => "No"
case _ => "Yes"
})
val C = A
.distinct
.join(B, 'id1 === 'id2, "left_outer")
.withColumn("joined",joined('id2))
.drop('id2)
.drop('text)
This will yield a dataframe C:[id1: string, number: int, joined: string]
that looks like this:
[id1,1234,Yes]
[id3,5678,No]
Note that I have added a distinct
to filter out duplicates in A
and that the last column in C
refers to wether or not is was joined.
EDIT: Following remark from OP, I have added the
drop
lines to remove the columns from B.