Search code examples

pyspark custom sort with partial values known and more efficient than udf

My code works but looking for higher efficent method instead of udf as my dataframe is huge and this udf may make it less efficient.

from pyspark.sql import functions as F
df = spark.createDataFrame(
[(1, 'a', 'A'),
 (2, 'a', 'B'),
 (3, 'a', 'C'),
 (4, 'b', 'A'),
 (5, 'b', 'B'),
 (6, 'b', 'C'),
 (7, 'c', 'A'),
 (8, 'c', 'B'),
 (9, 'c', 'C')],
['id', 'c1', 'c2']

from itertools import chain

from collections import defaultdict

custom_dict = {'c': 0,'a':1}
# Define the custom order as a defaultdict
my_Order = defaultdict(lambda: float('inf'))

# Create a UDF to get the custom order value for each value in the column
get_order = F.udf(lambda x: my_Order[x])

# Add a new column with the custom order value
df = df.withColumn("order_value", get_order(F.col("c1")))

# Order the DataFrame based on the custom order value
df = df.orderBy("order_value")

I also tried using below instead of udf but threw error as getItem cant be used with defaultdict

df = df.withColumn("order_value", F.when(F.col("c1").isin(list(my_order.keys())), my_Order.getItem(F.col("c1")).otherwise(float('inf'))))


  • Transform the custom_dict into a case statement with an else part for the default value.

    stmt = "case c1 "
    for k,v in custom_dict.items():
        stmt = stmt + f" when '{k}' then {v}"
    stmt = stmt + " else cast('Infinity' as double) end"


    case c1  when 'c' then 0 when 'a' then 1 else cast('Infinity' as double) end

    Then add a column using this statement and order the dataframe

    df.withColumn("order_value", F.expr(stmt))\
      .orderBy("order_value") \


    | id| c1| c2|order_value|
    |  7|  c|  A|        0.0|
    |  9|  c|  C|        0.0|
    |  8|  c|  B|        0.0|
    |  1|  a|  A|        1.0|
    |  3|  a|  C|        1.0|
    |  2|  a|  B|        1.0|
    |  4|  b|  A|   Infinity|
    |  5|  b|  B|   Infinity|
    |  6|  b|  C|   Infinity|