Search code examples
pysparkwindow-functions

How to add columns in pyspark dataframe dynamically


I am trying to add few columns based on input variable vIssueCols

from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.window import Window
vIssueCols=['jobid','locid']
vQuery1 = 'vSrcData2= vSrcData'
vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")
for x in vIssueCols:
Query1=vQuery1+'.withColumn("'+x+'_prev",F.lag(vSrcData.'+x+').over(vWindow1))'

exec(vQuery1)

now above query will generate vQuery1 as below, and it is working, but

vSrcData2= vSrcData.withColumn("jobid_prev",F.lag(vSrcData.jobid).over(vWindow1)).withColumn("locid_prev",F.lag(vSrcData.locid).over(vWindow1))

Cant I write a query something like

vSrcData2= vSrcData.withColumn(x+"_prev",F.lag(vSrcData.x).over(vWindow1))for x in vIssueCols

and generate the columns with the loop statement. Some blog has suggested to add a udf and call that, But instead using udf I will use above executing string method.


Solution

  • You can build your query using reduce.

    from pyspark.sql.functions import lag
    from pyspark.sql.window import Window
    from functools import reduce
    
    #sample data
    df = sc.parallelize([[1, 200, '1234', 'asdf'],
                         [1, 50, '2345', 'qwerty'],
                         [1, 100, '4567', 'xyz'],
                         [2, 300, '123', 'prem'],
                         [2, 10, '000', 'ankur']]).\
        toDF(["vKey","vOrderBy","jobid","locid"])
    df.show()
    
    vWindow1 = Window.partitionBy("vKey").orderBy("vOrderBy")
    
    #your existing processing
    df1= df.\
        withColumn("jobid_prev",lag(df.jobid).over(vWindow1)).\
        withColumn("locid_prev",lag(df.locid).over(vWindow1))
    df1.show()
    
    #to-be processing
    vIssueCols=['jobid','locid']
    df2 = (reduce(
        lambda r_df, col_name: r_df.withColumn(col_name+"_prev", lag(r_df[col_name]).over(vWindow1)),
        vIssueCols,
        df
    ))
    df2.show()
    

    Sample data:

    +----+--------+-----+------+
    |vKey|vOrderBy|jobid| locid|
    +----+--------+-----+------+
    |   1|     200| 1234|  asdf|
    |   1|      50| 2345|qwerty|
    |   1|     100| 4567|   xyz|
    |   2|     300|  123|  prem|
    |   2|      10|  000| ankur|
    +----+--------+-----+------+
    

    Output:

    +----+--------+-----+------+----------+----------+
    |vKey|vOrderBy|jobid| locid|jobid_prev|locid_prev|
    +----+--------+-----+------+----------+----------+
    |   1|      50| 2345|qwerty|      null|      null|
    |   1|     100| 4567|   xyz|      2345|    qwerty|
    |   1|     200| 1234|  asdf|      4567|       xyz|
    |   2|      10|  000| ankur|      null|      null|
    |   2|     300|  123|  prem|       000|     ankur|
    +----+--------+-----+------+----------+----------+
    

    Hope this helps!