I have this for loop in PySpark that iterates over different products of a supermarket, but it takes a long time.
How to do this without using a for-loop?
from sklearn.linear_model import LinearRegression
for i in products:
# selecting the products
df = df.filter(F.col("product") == i)
df_train = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
# exponential regression
exp = LinearRegression().fit(X, np.log(Y_delta))
exp_pred = np.exp(exp.predict(X_test))
# preparing to add the predictions to my df
schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
df = df.join(exp_delta, df.row_idx == exp_delta.row_idx)
You can use Spark's built-in functions to group your data by product and apply the calculations to each group in parallel, without using a for-loop. One way to achieve this is by using the groupBy
and mapInPandas
functions.
from sklearn.linear_model import LinearRegression
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Define a UDF
@pandas_udf(schema_exp_delta, PandasUDFType.GROUPED_MAP)
def exponential_regression(pdf):
X, Y = pdf[['x_col']], pdf[['y_col']]
exp = LinearRegression().fit(X, np.log(Y))
exp_pred = np.exp(exp.predict(X_test))
schema_exp_delta = StructType([StructField("delta_exp_pred", DoubleType(), True)])
exp_delta = spark.createDataFrame(exp_pred.tolist(), schema = schema_exp_delta)
exp_delta = exp_delta.withColumn("row_idx", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
return exp_delta
# Group the data and apply regression to each group
df = df.filter(F.col("pvp_discount").isin([0.2, 0.5, 0.75]))
df = df.groupby('product').apply(exponential_regression)