Search code examples
pysparkwindowuser-defined-functionspopulatepartition

With PySpark how do I populate values in a column based on either groupby/window/partition and perform a UDF?


I am trying to populate missing values in a column. The profile column in either the 1st row or any of the following rows(which are in depending order based on date) in the group/partition will have the value that has to be populated in the below cells in the profile column.

I have tried to run it with a window function but was not able to apply a UDF to a window function.

valuesA = [('1',"", "20190108"),('1',"", "20190107"),('1',"abcd", "20190106"),('1',"", "20190105"),('1',"", "20190104"),('2',"wxyz", "20190103"),('2',"", "20190102"),('2',"", "20190101")]
TableA = spark.createDataFrame(valuesA,['vid','profile', 'date'])

valuesB = [('1',"null", "20190108"),('1',"null", "20190107"),('1',"abcd", "20190106"),('1',"abcd", "20190105"),('1',"abcd", "20190104"),('2',"wxyz", "20190103"),('2', "wxyz", "20190102"),('2', "wxyz", "20190101")]
TableB = spark.createDataFrame(valuesB,['vid','profile', 'date'])

TableA.show()
TableB.show()
Table A: This is what I have. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|       |20190108|
|  1|       |20190107|
|  1|   abcd|20190106|
|  1|       |20190105|
|  1|       |20190104|
|  2|   wxyz|20190103|
|  2|       |20190102|
|  2|       |20190101|
+---+-------+--------+

Table B: What I am expecting. 
+---+-------+--------+
|vid|profile|    date|
+---+-------+--------+
|  1|   null|20190108|
|  1|   null|20190107|
|  1|   abcd|20190106|
|  1|   abcd|20190105|
|  1|   abcd|20190104|
|  2|   wxyz|20190103|
|  2|   wxyz|20190102|
|  2|   wxyz|20190101|
+---+-------+--------+


Solution

  • You can use last window function. Note - first withColumn is to replace all the empty strings with nulls - last function skips the nulls by default, which in this case is what we want.

    from pyspark.sql.window import Window
    from pyspark.sql.functions import *
    TableB = TableA.withColumn('profile', when(length('profile') == 0, lit(None)).otherwise(col('profile')))\
        .withColumn("profile", last(col('profile'), True).over(Window.partitionBy('vid').orderBy(col('date').desc())))
    
    TableB.show()
    

    Output:

    +---+-------+--------+
    |vid|profile|    date|
    +---+-------+--------+
    |  1|   null|20190108|
    |  1|   null|20190107|
    |  1|   abcd|20190106|
    |  1|   abcd|20190105|
    |  1|   abcd|20190104|
    |  2|   wxyz|20190103|
    |  2|   wxyz|20190102|
    |  2|   wxyz|20190101|
    +---+-------+--------+