Search code examples
apache-sparkpysparkapache-spark-mllibapache-spark-ml

why exactly should we avoid using for loops in PySpark?


I was attempting to speed up some of my pipelines but couldn't get a precise answer. Are some for loops OK, depending on the implementation? When is it OK to use a loop without taking too much of a performance hit? I've read

  1. This nice article by David Mudrauskas
  2. This nice Stack Overflow answer
  3. The Spark RDD docs, which advises

In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that’s just by accident and such code will not behave as expected in distributed mode.

If we were to use a for loop to step through and train a series of models, persisting them in the models dict,


from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression


dv = ['y1','y2','y3', ...] 

models = {}

for v in dv:

   assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
   model = LogisticRegression(featuresCol='features',labelCol=v,predictionCol=f'prediction_{v}')
   pipeline = Pipeline(stages=[assembler,model])
   pipe = pipeline.fit(train)
   models[v] = pipe



would that be meaningfully slower than enumerating and training them each explicitly like below? are they equivalent?

# y1

   assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
   model = LogisticRegression(featuresCol='features',labelCol='y1',predictionCol=f'prediction_y1')
   pipeline = Pipeline(stages=[assembler,model])
   pipe = pipeline.fit(train)
   models[v] = pipe

#y2

   assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
   model = LogisticRegression(featuresCol='features',labelCol='y2',predictionCol=f'prediction_y2')
   pipeline = Pipeline(stages=[assembler,model])
   pipe = pipeline.fit(train)
   models[v] = pipe

#y3

   assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
   model = LogisticRegression(featuresCol='features',labelCol='y3',predictionCol=f'prediction_y3')
   pipeline = Pipeline(stages=[assembler,model])
   pipe = pipeline.fit(train)
   models[v] = pipe

...

my understanding is that the SparkML library has parallelism built in, but I'm wondering if using the loop degrades this parallelism, and if there is a better way to train models in parallel. It's very slow on my setup, so maybe I'm doing something wrong... Thanks in advance!


Solution

  • Both approaches are the same. Irrespective of the approach, the parallelism depends on the number of cores you have across your Executors. You can read more in this article: https://www.javacodegeeks.com/2018/10/anatomy-apache-spark-job.html