Search code examples
machine-learningpysparknormalizationfeature-engineeringlabel-encoding

How can allocate statistic frequency to records/rows of dataframe in PySpark without using .toPandas() hackPySpark?


I'm a newbie in PySpark, and I want to translate the preprocessing including encoding and normalizing part scripts which are pythonic, into PySpark for synthetic data. (Columns A & C are categorical) At first, I have Spark data frame so-called sdf including 5 columns:

Below is the example:

#+----------+-----+---+-------+----+
#|A         |B    |C  |D      |E   |
#+----------+-----+---+-------+----+
#|Sentence  |92   |6  |False  |49  |
#|Sentence  |17   |3  |False  |15  |
#|Sentence  |17   |3  |False  |15  |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#|-         |0    |0  |False  |0   |
#+----------+-----+---+-------+----+

Now I want to allocate statistic frequency besides other features and concat the results with sdf. So far, I can do it using pythonic scripts:

#import libs
import copy
import numpy as np
import pandas as pd

from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import scale
from sklearn import preprocessing

#Statistical Preprocessing
def add_freq_to_features(df):
  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
  frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
  
  return new_df

# Encode and Normalize
def normalize_features(df):
  temp_df = df.copy()
  
  le = preprocessing.LabelEncoder()
  #le.fit(temp_df)
    
  temp_df[["A", "C"]] = temp_df[["A", "C"]].apply(le.fit_transform)
  
  for column in ["A", "B", "C", "D", "E"]:
    #-1: all rows selected into 1 
    # reshape(1, -1) select one row contains all columns/features
    temp_df[column] = MinMaxScaler().fit_transform(temp_df[column].values.reshape(-1, 1)) 
    
  return temp_df

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(features_df)

#Apply Encoding and Normalizing function
normalized_features_df = normalize_features(features_df)


to_numeric_columns = ["A", "B" , "C", "D", "E", "Freq"]
normalized_features_df[to_numeric_columns] = normalized_features_df[to_numeric_columns].apply(pd.to_numeric)
#normalized_features_df

Problem: what is the best approach to translating Preprocessing without converting Spark dataframe to Pandas datafarame toPandas() to optimize the pipeline and process it 100% spark form?

The expected output is shown below in the form of a Spark dataframe:

#+----------+-----+---+-------+----+----------+
#|A         |B    |C  |D      |E   |Freq      |
#+----------+-----+---+-------+----+----------+
#|Sentence  |92   |6  |False  |49  |0.166667  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|Sentence  |17   |3  |False  |15  |0.333333  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#|-         |0    |0  |False  |0   |0.500000  |
#+----------+-----+---+-------+----+----------+

Solution

  • Spark has Spark MLlib package that is designed for feature engineering and Machine Learning purpose. That being said, you shouldn't build features manually like what you're doing with Pandas. At the end of the day, you still have to use Spark to build your models, so why not start using Spark ML properly? I strongly suggest reading through few sections like building features, building pipelines, then classification/regression, and few other algorithms.

    Going back to your original question, this is Spark version of your sample code (I also ran it in your notebook with a little change to fit with your variables.)

    # this is to build "raw" Freq
    sdf2 = (sdf
        .groupBy(sdf.columns)
        .agg(F.count('*').alias('Freq'))
        .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
    )
    sdf2.cache().count()
    sdf2.show()
    
    # this is to normalize features using MinMaxScaler
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.feature import MinMaxScaler
    
    type_indexer = StringIndexer(inputCol='Type', outputCol='Type_Cat')
    encoding_indexer = StringIndexer(inputCol='Encoding_type', outputCol='Encoding_Type_Cat')
    assembler = VectorAssembler(inputCols=['Type_Cat', 'Length', 'Token_number', 'Encoding_Type_Cat', 'Character_feature', 'Freq'], outputCol='features')
    scaler = MinMaxScaler(inputCol='features', outputCol='scaled_features')
    pipeline = Pipeline(stages=[type_indexer, encoding_indexer, assembler, scaler])
    
    # Compute summary statistics and generate model
    model = pipeline.fit(sdf2)
    
    # rescale each feature to range [min, max].
    model.transform(sdf2).show(10, False)
    
    # Output
    # +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
    # |Type  |Length|Token_number|Encoding_type|Character_feature|Freq|Type_Cat|Encoding_Type_Cat|features                 |scaled_features          |
    # +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+
    # |String|8     |0           |true         |7                |1   |0.0     |0.0              |[0.0,8.0,0.0,0.0,7.0,1.0]|[0.5,1.0,0.5,0.5,1.0,0.5]|
    # |String|0     |0           |true         |0                |1   |0.0     |0.0              |(6,[5],[1.0])            |[0.5,0.0,0.5,0.5,0.0,0.5]|
    # +------+------+------------+-------------+-----------------+----+--------+-----------------+-------------------------+-------------------------+