I'm starting to use Python for forecasting coming from R, and I bumped into this problem while trying to estimate multiple models using multiprocessing. I'm trying to estimate a same model (ARIMA) with multiple combinations of parameters, and had success doing it using a for-loop. However, I was trying to apply multiprocessing to increase efficiency, and I'm having some issues.
This is the function I'm trying to run:
arima_comp = []
def arima_tester(p,q,P,Q):
mod = ARIMA(training_set, order = (p,1,q), seasonal_order = (P,1,Q,12))
modfit = mod.fit()
forecast = modfit.forecast(steps = len(test_set))
error = forecast - test_set
rmse = np.square(error).mean() ** 0.5
aic = modfit.aic
model = "ARIMA("+str(p)+",1,"+str(q)+")"+"("+str(P)+",1,"+str(Q)+")[12]: "
arima_comp.append(
{
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
)
The idea was to estimate multiple combinations of models and append all of them to this arima_comp
list, in order to compare them aftwerwards. It worked fine with the for-loop, but when I run the concurrent.futures.ProcessPoolExecutor.map
as follows:
pq_aux = [0,1,2,3]
PQ_aux = [0,1]
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux)
It seems as if the function was not executed, because if I call the arima_comp
list, it returns me an empty list:
arima_comp
#Returns []
Do any of you have any idea what am I doing wrong? It's not as if I'm receiving an error when I run this code, I just don't get the expected result. Thank you in advance.
I expected the same result as if I ran a for-loop over the function for different combinations of p,q,P,Q
in the arima_tester
function, the it would append to arima_comp
list, giving me something like that:
arima_tester(0,0,0,0)
arima_tester(0,0,0,1)
arima_tester(0,0,1,1)
arima_comp
#Returns:[{'Modelo': 'ARIMA(0,1,0)(0,1,0)[12]: ',
'AIC': 1198.2900239812564,
'RMSE': 6.653975098794921},
{'Modelo': 'ARIMA(0,1,0)(0,1,1)[12]: ',
'AIC': 1127.5935159029652,
'RMSE': 7.400152967890038},
{'Modelo': 'ARIMA(0,1,0)(1,1,1)[12]: ',
'AIC': 1125.0164173629523,
'RMSE': 7.996184118574912}]
UPDATE
I just followed the answers below, and now I'm getting a "BrokenProcessPool" error. A minimal, reproducible example that leads (at least for me) to the very same error can be found below:
import concurrent.futures
def function(a,b,c,d):
sum = a+b+c+d
halfsum = sum/2
squaredsum = a ** 2 + b ** 2 + c ** 2 + d ** 2
return {
"Sum":sum,
"Sum Divided by 2":halfsum,
"Squared Sum": squaredsum
}
ab_values = [0,1,2,3]
cd_values = [0,1]
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
Whenever I run this code I get the following error:
---------------------------------------------------------------------------
BrokenProcessPool Traceback (most recent call last)
Cell In[4], line 6
4 if __name__ == '__main__':
5 with concurrent.futures.ProcessPoolExecutor() as executor:
----> 6 final_list = list(executor.map(function, ab_values, ab_values, cd_values, cd_values))
File c:\Users\lfval\anaconda3\lib\concurrent\futures\process.py:570, in _chain_from_iterable_of_lists(iterable)
564 def _chain_from_iterable_of_lists(iterable):
565 """
566 Specialized implementation of itertools.chain.from_iterable.
567 Each item in *iterable* should be a list. This function is
568 careful not to keep references to yielded objects.
569 """
--> 570 for element in iterable:
571 element.reverse()
572 while element:
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:621, in Executor.map..result_iterator()
618 while fs:
619 # Careful not to keep a reference to the popped future
620 if timeout is None:
--> 621 yield _result_or_cancel(fs.pop())
622 else:
623 yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:319, in _result_or_cancel(***failed resolving arguments***)
317 try:
318 try:
--> 319 return fut.result(timeout)
320 finally:
321 fut.cancel()
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:458, in Future.result(self, timeout)
456 raise CancelledError()
457 elif self._state == FINISHED:
--> 458 return self.__get_result()
459 else:
460 raise TimeoutError()
File c:\Users\lfval\anaconda3\lib\concurrent\futures\_base.py:403, in Future.__get_result(self)
401 if self._exception:
402 try:
--> 403 raise self._exception
404 finally:
405 # Break a reference cycle with the exception in self._exception
406 self = None
I'm trying to run this code on Windows 10 using Python 3.10.9.
The list to which you are trying to append to, arima_comp
, is defined at global scope. When you create your processing pool, each process, which is running in its own address space (one of characteristics that distinguished multiprocessing from multithreading) will be appending to its own copy of arima_comp
. That is, this list is not sharable across processes. Thus when all the tasks submitted to the pool complete, the copy of arima_comp
that exists in the main process's address will still be empty since it was never modified.
The simplest solution is to:
arima_comp
altogether and have arima_tester
return the dictionary it was previously appending to arima_comp
.def arima_tester(p,q,P,Q):
... # Unmodified code omitted for brevity
return {
"Modelo":model,
"AIC":aic,
"RMSE":rmse
}
ProcessPoolExecutor.map
function you are calling actually returns an iterator that can be iterated to retrieve successive return values from arima_tester
. Thus you just need the following minor code change:with concurrent.futures.ProcessPoolExecutor() as executor:
arima_comp = list(executor.map(arima_tester, pq_aux, pq_aux, PQ_aux, PQ_aux))