Search code examples
pythonpyspark

Make a distinct dataframe based on a column with prioritization condition


I want to make a distinct dataframe that if they have the same product_number, there is a prioritization about the product_name.

Here is the prioritization: Product A > B > C

In this table, you can see that there are same product_number with different product_name.

enter image description here

How can I make it distinct to something like this?

enter image description here

For product_number 1000003, A has been prioritized.

For product_number 1000005, A has been prioritized.

For product_number 1000006, B has been prioritized.

Note: Name of the product is not actually A, B, C nor only 3


Solution

  • Please try below one -

    Example -

    1. Dataframe creation with sample data -
    from pyspark.sql import Row
    from pyspark.sql.functions import *
    df_pn = spark.createDataFrame([
        Row(product_number="1000001", product_name="coil"),
        Row(product_number="1000002", product_name="mouse"),
        Row(product_number="1000003", product_name="coil"),
        Row(product_number="1000003", product_name="laptop"),
        Row(product_number="1000004", product_name="coil"),
        Row(product_number="1000005", product_name="mouse"),
        Row(product_number="1000005", product_name="coil"),
        Row(product_number="1000005", product_name="laptop"),
        Row(product_number="1000006", product_name="mouse"),
        Row(product_number="1000006", product_name="laptop"),
        Row(product_number="1000007", product_name="laptop")
    ])
    
    1. Create a dict to map the priority for each product -
    product_map = { "coil" : "1", "mouse" : "3", "laptop" : "2" }
    
    1. Now map the product name from df in dict -
    from itertools import chain
    from pyspark.sql.functions import create_map, lit, when
    
    
    mapping_exprsn = create_map([lit(x) for x in chain(*product_map.items())])
    
    df1 = df_pn.filter(df_pn['product_name'].isNull()).withColumn('product_priority', lit(None))
    
    df2 = df_pn.filter(df_pn['product_name'].isNotNull()).withColumn(
        'product_priority', 
        when(
            df_pn['product_name'].isNotNull(), 
            mapping_exprsn[df_pn['product_name']]
        )
    )
    
    result = df1.unionAll(df2)
    display(result)
    
    1. Create a view -
    result.createOrReplaceTempView("test")
    
    1. Use below query to get final result -
    select test.product_number, test.product_name from test
    join (select min(product_priority) product_priority, first(product_name) as product_name, product_number from test group by product_number) test_agg
    on test.product_number=test_agg.product_number
    where test.product_priority=test_agg.product_priority