Search code examples
pythonapache-sparkpysparkrdd

Pyspark calculate row-wise weighted average with null entries


I have multiple data frames with values that have been computed on different source data. For simplicity I'll give an example with three dataframes but I'm looking for a solution with n dataframes

data_1

+------+-----------+
|person|first_value|
+------+-----------+
|     1|        1.0|
|     2|        0.9|
|     3|        0.8|
|     4|        0.7|
+------+-----------+

data_2

+------+------------+
|person|second_value|
+------+------------+
|     1|         0.5|
|     2|         0.6|
|     4|         0.7|
+------+------------+

data_3

+------+-----------+
|person|third_value|
+------+-----------+
|     1|        0.2|
|     3|        0.9|
|     4|        0.6|
+------+-----------+

Now I want to calculate a weighted average for the two or more dataframes - for this I first merge the dataframes

+------+-----------+------------+-----------+
|person|first_value|second_value|third_value|
+------+-----------+------------+-----------+
|     1|        1.0|         0.5|        0.2|
|     2|        0.9|         0.6|       null|
|     3|        0.8|        null|        0.9|
|     4|        0.8|         0.7|        0.6|
+------+-----------+------------+-----------+

The formula for the combined values would be:

val = val1 * weight1 + val2 * weight2 + val3 * weight3

however if one of the values is null the other value should be able to add up to 1 anyways so the if val2 is null then weight 2 should be distributed to all the other weights. I just can't find an elegant way to do this.

With w1 = 0.3, w2 = 0.4, w3 = 0.3 I currently get this as a result of my formula:

+------+----+
|person| val|
+------+----+
|     3|null|
|     1|0.56|
|     4| 0.7|
|     2|null|
+------+----+

However I want this:

+------+-----+
|person|  val|
+------+-----+
|     1| 0.56|
|     2|0.729|  <- val1*weight1_adj2 + val2*weight2_adj2
|     3| 0.85|  <- val1*weight1_adj3 + val3*weight3_adj3
|     4|  0.7|
+------+-----+

with adjusted weights

weight1_adj2 = w1/(w1+w2) = 0.57
weight2_adj2 = w2/(w1+w2) = 0.43
weight1_adj3 = w1/(w1+w3) = 0.5
weight3_adj3 = w3/(w1/w3) = 0.5

Is there any way of solving this issue in pyspark or even sql or will I have to go into a udf?

Here is my current code that does not handle null values:

data1 = [("1",1.0), 
        ("2",0.9), 
        ("3",0.8), 
        ("4",0.8) 
      ]

schema1 = ["person","first_value"]
first_df = spark.createDataFrame(data=data1, schema = schema1)

data2 = [("1",0.5), 
        ("2",0.6), 
        ("4",0.7) 
      ]

schema2 = ["person","second_value"]
second_df = spark.createDataFrame(data=data2, schema = schema2)

data3 = [("1",0.2), 
        ("3",0.9), 
        ("4",0.6) 
      ]

schema3 = ["person","third_value"]
third_df = spark.createDataFrame(data=data3, schema = schema3)

combined_df = first_df.join(
  second_df, ['person'], how='full'
).join(
  third_df, ['person'], how='full'
)

w1 = 0.3
w2 = 0.4
w3 = 0.3
combined_df.groupBy(['person']).agg(
  F.sum(
    col('first_value')*w1 + col('second_value')*w2  + col('third_value')*w3
  ).alias('val')).show()

Edit1: I am not asking about adding row-wise with null values as described here: Spark dataframe not adding columns with null values - I need to handle the weights so that the sum of the weights that are multiplied onto non-null values is always 1


Solution

  • The idea is to sum all weights per row where the columns are not null and then divide the individual weights by this sum.

    To get some flexibility with the number of columns and their weights I store the weights in a dict, using the column name as key:

    weights = {"first_value": 0.3, "second_value": 0.4, "third_value": 0.3}
    

    Then I can iterate over the dict to

    • calculate the sum of the weights for the non-null columns
    • and then calculate the sum over all non-columns for value of column * weight / sum of weights
    wf = "1 / ("
    val = ""
    for col in weights:
        wf += f"if({col} is null,0 ,{weights[col]}) + "
        val += f"if( {col} is null, 0, {col} * {weights[col]} * weight_factor) + "
    wf += "0 )"
    val += "0"
    
    combined_df = combined_df.withColumn("weight_factor", F.expr(wf)) \
        .withColumn("val", F.expr(val))
    

    Output:

    +------+-----------+------------+-----------+-----------------+------------------+
    |person|first_value|second_value|third_value|    weight_factor|               val|
    +------+-----------+------------+-----------+-----------------+------------------+
    |     1|        1.0|         0.5|        0.2|1.000000000000000|              0.56|
    |     2|        0.9|         0.6|       null|1.428571428571429|0.7285714285714289|
    |     3|        0.8|        null|        0.9|1.666666666666667|0.8500000000000002|
    |     4|        0.8|         0.7|        0.6|1.000000000000000|               0.7|
    +------+-----------+------------+-----------+-----------------+------------------+
    

    As next step you can proceed with the aggregation and sum over val.