Search code examples
dataframescalaapache-spark

Iterate records in spark dataframe by fetching values from another dataframe


I have 2 spark dataframes:

df1

+--------------------+---------------------------------------------------------+
|addressId           |sorted_city_ids                                          |
+--------------------+---------------------------------------------------------+
|AkhRMTbGbPiUnrBdSlov|[732916, 734241]                                         |
|AkhTMKHi9Ui7DHcspbfg|[724985, 725983, 725603, 728894, 728896, 728943, 729422] |
|AkhflxpBdi1tTZmmTf1w|[729103, 731732, 731736, 731738]                         |

df2

+--------------------+---------+------------------------------+-------------------+
|addressId           |city_id  |city_master_id                |dateInserted       |
+--------------------+---------+------------------------------+-------------------+
|AkhRMTbGbPiUnrBdSlov|732916   |ncLsR29SYDIeyc9aZp1cYojGvSkkWf|2021-11-04 05:30:00|
|AkhTMKHi9Ui7DHcspbfg|9361852  |mt9WV7omxa8nxD1n7yOGFDtEOTWPdq|2021-10-19 05:30:00|
|AkhflxpBdi1tTZmmTf1w|729103   |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731732   |msEfdZemxa8nxD1n7yOGFDtEOTWPdq|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731736   |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-04 20:50:57|
|AkhflxpBdi1tTZmmTf1w|731738   |kZiy4ZeULOqqdO8yKDmsDA13RdegC2|2022-11-05 20:50:57|

I need to process each row in df1. Based on sorted_city_ids array column I need to do verify each city_id by fetching its corresponding city_master_id and dateinserted from df2. I need to check if all the city_ids in a row has same city_master_id and same dateInserted (only date and not time).

For example: In row3 of df1, I need to compare 731732, 731736 and 731738 with 729103 individually, each city_id should have same city_master_id and date as 729103(first city in array).

Expected output for row 3:

731732 
731738 

731732 - as city_master_id is not matched, 731738 - as date is not matched

I dont know how to easily fetch city_master_id and dateInserted for a given city to compare with other cities.. I tried to create a map something like below (cityId -> List(city_master_id,date)) (Map(String, List<String,String>) using collect.toMap() but as data is huge from the collect() driver is failing with heap space.

Could someone please provide some leads.

Updated description for better understanding.


Solution

  • Update: As per our discussion in comments

    You need to compare all cities in array with the first city based on city_master_id and dateInserted and collect city_ids of unmatching cities.

    This would work:

    df1.select("addressId", F.posexplode("sorted_city_ids").alias("index","city_id"))\
    .alias("df1")\
    .join(df2.withColumn("dateInserted", F.to_date(F.col("dateInserted"))).alias("df2"), [F.col("df1.addressId") == F.col("df2.addressId"),                                                                                      F.col("df1.city_id") == F.col("df2.city_id")])\
    .withColumn("firstDateInserted", F.first("dateInserted").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
    .withColumn("firstCityMasterId", F.first("city_master_id").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
    .withColumn("unMatchingCity", F.when((F.col("city_master_id") != F.col("firstCityMasterId")) | (F.col("dateInserted") != F.col("firstDateInserted")), F.col("df1.city_id")))\
    .select("df1.city_id", "df1.addressId", "df2.dateInserted", "df2.city_master_id", "unMatchingCity")\
    .show(truncate=False)
    

    Example: Df1 - Df2:

    Df1-Df2

    Output:

    Output

    Get as List:

    df1.select("addressId", F.posexplode("sorted_city_ids").alias("index","city_id"))\
    .alias("df1")\
    .join(df2.withColumn("dateInserted", F.to_date(F.col("dateInserted"))).alias("df2"), [F.col("df1.addressId") == F.col("df2.addressId"),                                                                                F.col("df1.city_id") == F.col("df2.city_id")])\
    .withColumn("firstDateInserted", F.first("dateInserted").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
    .withColumn("firstCityMasterId", F.first("city_master_id").over(Window.partitionBy(F.col("df1.addressId")).orderBy("df1.index")))\
    .withColumn("unMatchingCity", F.when((F.col("city_master_id") != F.col("firstCityMasterId")) | (F.col("dateInserted") != F.col("firstDateInserted")), F.col("df1.city_id")))\
    .groupBy("df1.addressId")\
    .agg(F.first("df1.city_id").alias("city_id"),F.collect_list("unMatchingCity").alias("UnMatchedCities"), F.first("firstCityMasterId").alias("city_master_id"), F.first("dateInserted").alias("dateInserted"))\
    .show(truncate=False)
    

    Output:

    Output