I am using Scala with Spark 1.5.
Given two DataFrames DataFrame1
and DataFrame2
, I want to search for values in DataFrame2
for the keys in DataFrame1
and create DataFrame3
with the result. The functionality is unique as DataFrame1
has many keys in each row and the output DataFrame should have keys and values populated in the same order like shown in the output DataFrame below. I'm looking for a distributed solution, if possible, as this functionality should to be implemented on millions of records (~10 million records). Any directions on how to proceed and information on useful methods is of great help. Thanks in advance!
contract_id, cust1_id, cust2_id, cust3_id, cust4_id
500001,100000001,100000002,100000003,100000004
500305,100000001,100000002,100000007
500303,100000021
500702,110000045
500304,100000021,100000051,120000051
503001,540000012,510000012,500000002,510000002
503051,880000045
Input: DataFrame2 (Customer master lookup information)
cust_id,date_of_birth
100000001,1988-11-04
100000002,1955-11-16
100000003,1980-04-14
100000004,1980-09-26
100000007,1942-03-07
100000021,1964-06-22
100000051,1920-03-12
120000051,1973-11-17
110000045,1955-11-16
880000045,1980-04-14
540000012,1980-09-26
510000012,1973-03-15
500000002,1958-08-18
510000002,1942-03-07
contract_id, cust1_id, cust2_id, cust3_id, cust4_id, cust1_dob, cust2_dob, cust3_dob, cust4_dob
500001,100000001,100000002,100000003,100000004,1988-11-04,1955-11-16,1980-04-14,1980-09-26
500305,100000001,100000002,100000007, ,1988-11-04,1955-11-16,1942-03-07
500303,100000021, , , ,1964-06-22
500702,110000045 , , ,1955-11-16
500304,100000021,100000051,120000051, ,1964-06-22,1920-03-12,1973-11-17
503001,540000012,510000012,500000002,510000002,1980-09-26,1973-03-15,1958-08-18,1942-03-07
503051,880000045 , , ,1980-04-14
This may not be the most effective solution but this works for your case.
import spark.implicits._
val df1 = spark.sparkContext
.parallelize(
Seq(
("500001", "100000001", "100000002", "100000003", "100000004"),
("500305", "100000001", "100000002", "100000007", ""),
("500303", "100000021", "", "", ""),
("500702", "110000045", "", "", ""),
("500304", "100000021", "100000051", "120000051", ""),
("503001", "540000012", "510000012", "500000002", "510000002"),
("503051", "880000045", "", "", "")
))
.toDF("contract_id", "cust1_id", "cust2_id", "cust3_id", "cust4_id")
val df2 = spark.sparkContext
.parallelize(
Seq(
("100000001", "1988-11-04"),
("100000002", "1955-11-16"),
("100000003", "1980-04-14"),
("100000004", "1980-09-26"),
("100000007", "1942-03-07"),
("100000021", "1964-06-22"),
("100000051", "1920-03-12"),
("120000051", "1973-11-17"),
("110000045", "1955-11-16"),
("880000045", "1980-04-14"),
("540000012", "1980-09-26"),
("510000012", "1973-03-15"),
("500000002", "1958-08-18"),
("510000002", "1942-03-07")
))
.toDF("cust_id", "date_of_birth")
val finalDF = df1
.join(df2, df1("cust1_id") === df2("cust_id"), "left")
.drop("cust_id")
.withColumnRenamed("date_of_birth", " cust1_dob")
.join(df2, df1("cust2_id") === df2("cust_id"), "left")
.drop("cust_id")
.withColumnRenamed("date_of_birth", " cust2_dob")
.join(df2, df1("cust3_id") === df2("cust_id"), "left")
.drop("cust_id")
.withColumnRenamed("date_of_birth", " cust3_dob")
.join(df2, df1("cust4_id") === df2("cust_id"), "left")
.drop("cust_id")
.withColumnRenamed("date_of_birth", " cust4_dob")
finalDF.na.fill("").show()