Search code examples
pythonapache-sparkpysparkapache-spark-sqlapache-spark-ml

Compute and compare the average of two columns


I started converting my Pandas implementations to pySpark but i'm having trouble going through some basic operations. So I have this table:

+-----+-----+----+
| Col1|Col2 |Col3|
+-----+-----+----+
|  1  |[1,3]|   0|
|  44 |[2,0]|   1|
|  77 |[1,5]|   7|
+-----+-----+----+

My desired output is:

+-----+-----+----+----+
| Col1|Col2 |Col3|Col4|
+-----+-----+----+----+
|  1  |[1,3]|   0|2.67|
|  44 |[2,0]|   1|2.67|
|  77 |[1,5]|   7|2.67|
+-----+-----+----+----+

To get here :

  • I averaged the first item of every array in Col2 and averaged the second item of every array in Col2. Since the average of the second "sub-column" is bigger ((3+0+5)/3) than the first "sub-column" ((1+2+1)/3) this is the "winning" condition. After that I created a new column that has the "winning" average replicated over the number of rows of that table (in this example 3). I was already able to do this by "manually" selecting ta column, average it and then use a "lit" to replicate the results. The problem with my implementation is that collect() takes a lot of time and afaik its not recommended. Could you please help me on this one ?

Solution

  • You can use greatest to get the greatest average of each (sub-)column in the array:

    from pyspark.sql import functions as F, Window
    
    df2 = df.withColumn(
        'Col4',
        F.greatest(*[F.avg(F.udf(lambda r: [float(i) for i in r.toArray()], 'array<double>')('Col2')[i]).over(Window.orderBy()) for i in range(2)])
    )
    
    df2.show()
    +----+------+----+------------------+
    |Col1|  Col2|Col3|              Col4|
    +----+------+----+------------------+
    |   1|[1, 3]|   0|2.6666666666666665|
    |  44|[2, 0]|   1|2.6666666666666665|
    |  77|[1, 5]|   7|2.6666666666666665|
    +----+------+----+------------------+
    

    If you want the array size to be dynamic, you can do

    arr_size = df.select(F.max(F.size(F.udf(lambda r: [float(i) for i in r.toArray()], 'array<double>')('Col2')))).head()[0]
    
    df2 = df.withColumn(
        'Col4',
        F.greatest(*[F.avg(F.udf(lambda r: [float(i) for i in r.toArray()], 'array<double>')('Col2')[i]).over(Window.orderBy()) for i in range(arr_size)])
    )