Search code examples
pythonapache-sparkpyspark

Mapping in PySpark without RDD.collectAsMap


I got two dataframes. One dataframe consists of MaterialNumber and HierarchyNumber (18 digits) and also almost 200 other columns, the second one is a mapping file consists of HierarchyNumber and Description. My task is to split the HierarchyNumber in the first data frame into 5 levels by (Level1-Level5) by specific digit numbers which I achieve by

df = df_material.withColumn("Level1", substring(col("HierarchyNumber"), 1, 3)) \
.withColumn("Level2", substring(col("HierarchyNumber"), 1, 6)) \
.withColumn("Level3", substring(col("HierarchyNumber"), 1, 10)) \
.withColumn("Level4", substring(col("HierarchyNumber"), 1, 14)) \
.withColumn("Level5", col("HierarchyNumber"))

and now what I would like to do it to replace substring of digit codes like 3-digit codes for Level 1, 6 digit codes for Level 2 with appropriate Description from second dataframe. I've always used RDD.collectAsMap but its not whitelisted and I need another solution using PySpark. Any suggestions, please?

The first table looks like that

materialnumber hierarchynumber col3 ..col100
1 001915762470792000
2 003411831611043000
3 002653955128061000

After my initial transformation

materialnumber hierarchynumber L1 L2 L3 L4 L5 ..col100
1 001915762470792000 001 001915 0019157624 00191576247079 001915762470792000
2 003411831611043000 003 003411 0034118316 00341183161104 003411831611043000
3 002653955128061000 002 002653 0026539551 00265395512806 002653955128061000

Mappings dataframe (df2) looks like that

hierarchynumber description
001 brand
002 subbrand
001915 entity

and so on. And then I want to replace the digits from L1-L5 with its description


Solution

  • You can join df2 with df for each level like this:

    # we use coalesce and a left join to fetch the description if it exists,
    # keep the number otherwise
    # If all numbers have a description, you may omit coalesce and use an inner join
    for i in range(1, 6):
        level = f'Level{i}'
        renamed_df2 = df2.withColumnRenamed('hierarchynumber', level)
        df = df.join(renamed_df2, f'Level{i}', "left")\
               .withColumn(level, coalesce(col('description'), col(level)))\
               .drop('description')
    
    df.show()
    
    +------------------+--------------+----------+------+--------+--------------+------------------+
    |            Level5|        Level4|    Level3|Level2|  Level1|materialnumber|   hierarchynumber|
    +------------------+--------------+----------+------+--------+--------------+------------------+
    |001915762470792000|00191576247079|0019157624|entity|   brand|             1|001915762470792000|
    |003411831611043000|00341183161104|0034118316|003411|     003|             2|003411831611043000|
    |002653955128061000|00265395512806|0026539551|002653|subbrand|             3|002653955128061000|
    +------------------+--------------+----------+------+--------+--------------+------------------+