I have 2 spark dataframes:
df1
+--------------------+---------------------------------------------------------+
|addressId |sorted_city_ids |
+--------------------+---------------------------------------------------------+
|AkhRMTbGbPiUnrBdSlov|[732916, 734241] |
|AkhTMKHi9Ui7DHcspbfg|[724985, 725983, 725603, 728894, 728896, 728943, 729422] |
|AkhflxpBdi1tTZmmTf1w|[729103, 731732, 731736, 731738] |
df2
+--------------------+---------+------------------------------+-------------------+
|addressId |city_id |city_master_id |dateInserted |
+--------------------+---------+------------------------------+-------------------+
|AkhRMTbGbPiUnrBdSlov|732916 |ncLsR29SYDIeyc9aZp1cYojGvSkkWf|2021-11-04 05:30:00|
|AkhTMKHi9Ui7DHcspbfg|9361852 |mt9WV7omxa8nxD1n7yOGFDtEOTWPdq|2021-10-19 05:30:00|
|AkhflxpBdi1tTZmmTf1w|729103 |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731732 |msEfdZemxa8nxD1n7yOGFDtEOTWPdq|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731736 |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731738 |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-05 20:50:57|
I need to process each row in df1
. Based on sorted_city_ids
array column I need to do verify each city_id by fetching its corresponding city_master_id
and dateinserted
from df2
. I need to check if all the city_ids in a row has same city_master_id
and same dateInserted
(only date and not time).
For example: In row3 of df1, I need to compare 731732, 731736 and 731738 with 729103 individually, each city_id should have same city_master_id and date as 729103(first city in array).
Expected output for row 3:
731732
731738
731732 - as city_master_id is not matched, 731738 - as date is not matched
I dont know how to easily fetch city_master_id and dateInserted for a given city to compare with other cities..
I tried to create a map something like below
(cityId -> List(city_master_id,date))
(Map(String, List<String,String>) using collect.toMap()
but as data is huge from the collect() driver is failing with heap space.
Could someone please provide some leads.
Updated description for better understanding.
Update: As per our discussion in comments
You need to compare all cities in array with the first city based on city_master_id and dateInserted and collect city_ids of unmatching cities.
This would work:
df1.select("addressId", F.posexplode("sorted_city_ids").alias("index","city_id"))\
.alias("df1")\
.join(df2.withColumn("dateInserted", F.to_date(F.col("dateInserted"))).alias("df2"), [F.col("df1.addressId") == F.col("df2.addressId"), F.col("df1.city_id") == F.col("df2.city_id")])\
.withColumn("firstDateInserted", F.first("dateInserted").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
.withColumn("firstCityMasterId", F.first("city_master_id").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
.withColumn("unMatchingCity", F.when((F.col("city_master_id") != F.col("firstCityMasterId")) | (F.col("dateInserted") != F.col("firstDateInserted")), F.col("df1.city_id")))\
.select("df1.city_id", "df1.addressId", "df2.dateInserted", "df2.city_master_id", "unMatchingCity")\
.show(truncate=False)
Example: Df1 - Df2:
Output:
Get as List:
df1.select("addressId", F.posexplode("sorted_city_ids").alias("index","city_id"))\
.alias("df1")\
.join(df2.withColumn("dateInserted", F.to_date(F.col("dateInserted"))).alias("df2"), [F.col("df1.addressId") == F.col("df2.addressId"), F.col("df1.city_id") == F.col("df2.city_id")])\
.withColumn("firstDateInserted", F.first("dateInserted").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
.withColumn("firstCityMasterId", F.first("city_master_id").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
.withColumn("unMatchingCity", F.when((F.col("city_master_id") != F.col("firstCityMasterId")) | (F.col("dateInserted") != F.col("firstDateInserted")), F.col("df1.city_id")))\
.groupBy("df1.addressId")\
.agg(F.first("df1.city_id").alias("city_id"),F.collect_list("unMatchingCity").alias("UnMatchedCities"), F.first("firstCityMasterId").alias("city_master_id"), F.first("dateInserted").alias("dateInserted"))\
.show(truncate=False)
Output: