Search code examples
pythonpysparkapache-spark-mllibnormalize

Normalize pyspark data frame by group


I want to normalize my data frame in pyspark by group. The solution proposed here does not help, as I want to transform every column in my data frame. The code I used in python on a pandas df is the following:

df_norm = (X_df
.groupby('group')
.transform(lambda x: (x - x.min())/(x.max() - x.min()))
.fillna(0))

How can I do this in pyspark either with a df or with RDD?

Example: input:

columns = ['group', 'sensor1', 'sensor2', 'sensor3']
vals = [
    (a, 0.8, 0.02, 100),
    (a, 0.5, 0.1, 200),
    (a, 1, 0.5, 50),
    (a, 0, 0.8, 30)
    (b, 10, 1, 0)
    (b, 20, 2, 3)
    (b, 5, 4, 1)
]

desired output:

columns = ['group','sensor1', 'sensor2', 'sensor3']
vals = [
    (a, 0.8, 0, 0.4118),
    (a, 0.5, 0.1026, 1),
    (a, 1, 0.615, 0.11),
    (a, 0, 1, 0)
    (b, 0.333, 0, 0)
    (b, 1, 0.333, 1)
    (b, 0, 1, 0.333)
]

Solution

  • I ended up doing it this way:

    w = Window.partitionBy('group')
    for c in cols_to_normalize:
        df = (df.withColumn('mini', F.min(c).over(w))
            .withColumn('maxi', F.max(c).over(w))
            .withColumn(c, ((F.col(c) - F.col('mini')) / (F.col('maxi') - F.col('mini'))))
            .drop('mini')
            .drop('maxi'))