I have a function in pyspark as below where each new column is a new feature. For example journey_email_been_sent_flag
, journey_opened_flag
,journey_clicked_flag
, and journey_transaction_flag
are new features. I want to create a function in which if a user wants to get one of the mentioned feature, I should be able to retrieve the result for the user.The basic idea behind this is that re-usability of features in central repository. Is there a way to achieve this dynamically for each features.
journey_level_revenue_email_open_click = spark.read.parquet(journey_level_revenue_path)
analysis_start_date = "2019-05-06"
def df_ptf_overall(df : DataFrame,startdate : StringType):
ptf_overall1 = df \
.filter(F.col('journey_start_date') >= f"{startdate}") \
.select('bpid',
'journeyinstanceid',
'journeyid',
'journey_start_date',
'measurement_group',
'country',
'email_14days',
'opened_14days',
'clicked_14days',
'testfnemail_14days',
'testfnopened_14days',
'testfnclicked_14days',
'revenue_14days',
'num_trx_14days',
'num_items_bought_14days'
)
return ptf_overall1
#display(df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date))
def df_ptf_overall2(df : DataFrame,startdate : StringType):
ptf_overall2 = df_ptf_overall(df,startdate).filter('measurement_group = "test"') \
.withColumn('journey_email_been_sent_flag', F.when(F.col('email_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_opened_flag', F.when(F.col('opened_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_clicked_flag', F.when(F.col('clicked_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
.withColumn('journey_transaction_flag', F.when(F.col('revenue_14days') > 0, F.lit(1)).otherwise(F.lit(0)))
return ptf_overall2
#display(df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date))
feature_columns = ['journey_email_been_sent_flag','journey_opened_flag'] df = df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date)
def feature_result(df:DataFrame, features : ArrayType): input_columns = df.columns selected_columns = input_columns + feature_columns return df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date).select(*selected_columns).show()
feature_result(df,feature_columns)
I could implement something like this to achieve it