Search code examples
pythonapache-sparkpysparkapache-spark-sql

get a missing value for a column in one dataframe from another dataframe


I have a dataframe 'persons' as below

name    age serial_no   mail
John    25  100483      [email protected]
Sam     49  448900      [email protected]
Will    63              [email protected]
Robert  20  299011  
Hill    78              [email protected]

I have another dataframe 'people' as below

name    s_no        e_mail
John    100483      [email protected]
Sam     448900      [email protected]
Will    229809      [email protected]                
Robert  299011  
Hill    567233      [email protected]

I need to add missing serial_no and mail in persons dataframe by joining it with people dataframe. If serial_no is missing in persons dataframe, I need to join persons with people on mail column to get the serial_no. If mail is missing in persons dataframe, I need to join persons with people on serial_no column to get the mail. If there is no matching value found, "NA" should be loaded to it.

My final df should be like

name    age serial_no   mail
John    25  100483      [email protected]
Sam     49  448900      [email protected]
Will    63  229809      [email protected]            
Robert  20  299011      NA
Hill    78  567233      [email protected]

This is my code snippet

final_df = persons.join(people, (persons[serial_no] == people[s_no]) \
                    & (persons['serial_no'].isNull() | (persons['serial_no'] == people['s_no'])), how='left', \
                    persons['serial_no'], people['serial_no'], persons['mail'].alias('persons_mail'), \
                    people['e_mail'].alias('people_mail'], \
                    when(people['serial_no.people'].isNull(), "NA").otherwise(people['serial_no.people']).alias('serial_no_people'])))
                    

Im blocked here and my code is not working. Can you pls suggest how to proceed


Solution

  • Persons can be left joined with people two times, enriching one field for each join. On Scala (guess, can be converted to Python easily):

    val persons = Seq(
      ("John", 25, Some(100483), "[email protected]"),
      ("Sam", 49, Some(448900), "[email protected]"),
      ("Will", 63, None, "[email protected]"),
      ("Robert", 20, Some(299011), null),
      ("Hill", 78, None, "[email protected]")
    ).toDF(
      "name", "age", "serial_no", "mail"
    )
    val people = Seq(
      ("John", 100483, "[email protected]"),
      ("Sam", 448900, "[email protected]"),
      ("Will", 229809, "[email protected]"),
      ("Robert", 299011, null),
      ("Hill", 567233, "[email protected]")
    ).toDF(
      "name", "s_no", "e_mail"
    )
    // Action
    val serialsEnriched = persons
      .join(people, $"mail" === $"e_mail", "left")
      .select(
        persons.col("name"),
        $"age",
        coalesce($"serial_no", $"s_no", lit("NA")).alias("serial_no"),
        $"mail"
      )
    val mailEnriched = serialsEnriched
      .join(people, $"serial_no" === $"s_no", "left")
      .select(
        persons.col("name"),
        $"age",
        $"serial_no",
        coalesce($"mail", $"e_mail", lit("NA")).alias("mail"),
      )
    

    Output for mailEnriched:

    +------+---+---------+------------+
    |name  |age|serial_no|mail        |
    +------+---+---------+------------+
    |John  |25 |100483   |[email protected]|
    |Sam   |49 |448900   |[email protected] |
    |Will  |63 |229809   |[email protected]|
    |Robert|20 |299011   |NA          |
    |Hill  |78 |567233   |[email protected]|
    +------+---+---------+------------+
    

    Note: If duplicate e_mail or s_no is possible in people, some additional actions will be required for remove duplicates.