I have a very wide dataframe with label columns. I want to run a logistic regression for each column independenly. I'm trying to find the most efficient way to run this in parallel.
+----------+--------+--------+--------+-----+------------+
| features | label1 | label2 | label3 | ... | label30000 |
+----------+--------+--------+--------+-----+------------+
My initial thought was to use ThreadPoolExecutor
, get result for each column, and join:
extract_prob = udf(lambda x: float(x[1]), FloatType())
def lr_for_column(argm):
col_name = argm[0]
test_res = argm[1]
lr = LogisticRegression(featuresCol="features", labelCol=col_name, regParam=0.1)
lrModel = lr.fit(tfidf)
res = lrModel.transform(test_tfidf)
test_res = test_res.join(res.select('id', 'probability'), on="id")
test_res = test_res.withColumn(col_name, extract_prob('probability')).drop("probability")
return test_res.select('id', col_name)
with futures.ThreadPoolExecutor(max_workers=100) as executor:
future_results = [executor.submit(lr_for_column, [colname, test_res]) for colname in list_of_label_columns]
futures.wait(future_results)
for future in future_results:
test_res = test_res.join(future.result(), on="id")
but this method is not very performant. Is there a faster way to do this?
Taking into account available resources you have nothing to gain by using ThreadPoolExecutor
- having 32 cores in total and 200 partitions you can process only ~16% of you data at the same time, and this fraction can become only worse, if data grows.
If you want to train 30000 models and use default number of iterations (100, probably to low in practice) you Spark program will submit around 3 000 000 jobs (each iteration create a separate one), and only a fraction of each can be processed concurrently - this doesn't give much hope for improvement, unless you add more resource.
Despite that there are some things you can try:
If you decide to reduce dimensions consider sampling to further reduce size of your training data, and consequently reduce number of partitions and increase overall throughput.
If there are strong linear trends in your data, there should be visible even on a smaller sample, without significant loss of precision.
Consider replacing expensive pyspark.ml
algorithm with a variant that doesn't require multiple jobs, for example using some combination of tools from spark-sklearn
(you could create ensemble model, by fitting sklearn
model on each partition).
Oversubscribing cores. For example if you have 4 physical cores / node, allow 8 or 16 to account for IO wait time.