I have to apply conditions to pyspark dataframes based on a distribution.
My distribution looks like:
mp = [413, 291, 205, 169, 135]
And I am generating condition expression like this:
when_decile = (F.when((F.col(colm) >= float(mp[0])), F.lit(1))
.when( (F.col(colm) >= float(mp[1])) & (F.col(colm) < float(mp[0])), F.lit(2))
.when( (F.col(colm) >= float(mp[2])) & (F.col(colm) < float(mp[1])), F.lit(3))
.when( (F.col(colm) >= float(mp[3])) & (F.col(colm) < float(mp[2])), F.lit(4))
.when( (F.col(colm) >= float(mp[4])) & (F.col(colm) < float(mp[3])), F.lit(5))
.otherwise(F.lit(-99)))
Then applying it to dataframe:
df_temp = df_temp.withColumn('decile_rank', when_decile)
Now I have to keep this code in a function which receives 'mp'
and 'df_temp'
as inputs. The length of 'mp'
is variable.
So, now I am generating condition expression like this:
when_decile = '(F.when((F.col(colm) >= float(' + str(mp[0]) + '), F.lit(1))'
for i in range(len(mp)-1):
when_decile += '.when( (F.col(colm) >= float(' + str(mp[i+1]) + ')) & (F.col(colm) < float(' + str(mp[i]) + ')), F.lit(' + str(i+2) + '))'
when_decile += '.otherwise(F.lit(-99)))'
The problem now is that the 'when_decile'
is a string and it cannot be applied to 'df_temp'
.
How can I convert this string to a condition?
Try this,
df_temp = df_temp.withColumn('decile_rank', eval(when_decile))