Search code examples
apache-sparkpysparkscikit-learn

Avoiding for loop in PySpark with Machine Learning


I have this for loop in PySpark that iterates over different products of a supermarket, but it takes a long time.

How to do this without using a for-loop?

from sklearn.linear_model import LinearRegression
for i in products:
    # selecting the products
    df = df.filter(F.col("product") == i)
    df_train = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))

    # exponential regression
    exp = LinearRegression().fit(X, np.log(Y_delta))
    exp_pred = np.exp(exp.predict(X_test))

    # preparing to add the predictions to my df
    schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
    exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
    exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))

    df = df.join(exp_delta, df.row_idx == exp_delta.row_idx)

Solution

  • You can use Spark's built-in functions to group your data by product and apply the calculations to each group in parallel, without using a for-loop. One way to achieve this is by using the groupBy and mapInPandas functions.

    from sklearn.linear_model import LinearRegression
    import pandas as pd
    import pyspark.sql.functions as F
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    # Define a UDF
    @pandas_udf(schema_exp_delta, PandasUDFType.GROUPED_MAP)
    def exponential_regression(pdf):
       X, Y = pdf[['x_col']], pdf[['y_col']]
       exp = LinearRegression().fit(X, np.log(Y))
       exp_pred = np.exp(exp.predict(X_test))
       schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
       exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
       exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
       return exp_delta
    
    # Group the data and apply regression to each group
    df = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
    df = df.groupby('product').apply(exponential_regression)