Search code examples
pythonpysparkdatabricksfeature-extractionfeature-selection

featurestore functionality in python/databricks


I have a function in pyspark as below where each new column is a new feature. For example journey_email_been_sent_flag, journey_opened_flag ,journey_clicked_flag, and journey_transaction_flag are new features. I want to create a function in which if a user wants to get one of the mentioned feature, I should be able to retrieve the result for the user.The basic idea behind this is that re-usability of features in central repository. Is there a way to achieve this dynamically for each features.

journey_level_revenue_email_open_click = spark.read.parquet(journey_level_revenue_path)
analysis_start_date = "2019-05-06"
def df_ptf_overall(df : DataFrame,startdate : StringType):
  ptf_overall1 = df \
        .filter(F.col('journey_start_date') >= f"{startdate}") \
        .select('bpid',
                'journeyinstanceid',
                'journeyid',
                'journey_start_date',
                'measurement_group',
                'country',
                'email_14days',
                'opened_14days',
                'clicked_14days',
                'testfnemail_14days',
                'testfnopened_14days',
                'testfnclicked_14days',
                'revenue_14days',
                'num_trx_14days',
                'num_items_bought_14days'
                )
  return ptf_overall1

#display(df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date))

def df_ptf_overall2(df : DataFrame,startdate : StringType):
  ptf_overall2 = df_ptf_overall(df,startdate).filter('measurement_group = "test"') \
    .withColumn('journey_email_been_sent_flag', F.when(F.col('email_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_opened_flag', F.when(F.col('opened_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_clicked_flag', F.when(F.col('clicked_14days') > 0, F.lit(1)).otherwise(F.lit(0))) \
    .withColumn('journey_transaction_flag', F.when(F.col('revenue_14days') > 0, F.lit(1)).otherwise(F.lit(0)))
  return ptf_overall2

#display(df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date))

Solution

  • feature_columns = ['journey_email_been_sent_flag','journey_opened_flag'] df = df_ptf_overall(journey_level_revenue_email_open_click,analysis_start_date)

    def feature_result(df:DataFrame, features : ArrayType): input_columns = df.columns selected_columns = input_columns + feature_columns return df_ptf_overall2(journey_level_revenue_email_open_click,analysis_start_date).select(*selected_columns).show()

    feature_result(df,feature_columns)

    I could implement something like this to achieve it