I am rewriting a Reinforcement Learning Framework from serial code execution to parallel (multiprocessing) to reduce training time. It works but after a few hours of training a MemoryError
is thrown. I tried adding gc.collect
after each loop with no change.
Here's the for loop, that utilizes multiprocessing:
for episode in episodes:
env.episode = episode
flex_list = [0,1,2]
for machine in env.list_of_machines:
flex_plan = []
for time_step in range(0,env.steplength):
flex_plan.append(random.choice(flex_list))
machine.flex_plan = flex_plan
env.current_step = 0
steps = []
state = env.reset(restricted=True)
steps.append(state)
# multiprocessing part, has condition to use a specific amount of CPUs or 'all' of them
####################################################
func_part = partial(parallel_pool, episode=episode, episodes=episodes, env=env, agent=agent, state=state, log_data_qvalues=log_data_qvalues, log_data=log_data, steps=steps)
if CPUs_used == 'all':
mp.Pool().map(func_part, range(env.steplength-1))
else:
mp.Pool(CPUs_used).map(func_part, range(env.steplength-1))
############################################################
# model is saved periodically, not only in the end
save_interval = 100 #set episode interval to save models
if (episode + 1) % save_interval == 0:
agent.save_model(f'models/model_{filename}_{episode + 1}')
print(f'model saved at episode {episode + 1}')
plt.close()
gc.collect()
Output after 26 episodes of training:
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 231/234 THD fake slack: 0.09487 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.181
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 232/234 THD fake slack: 0.09488 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.181
Episode: 26/100 Action: 1/11 Phase: 3/3 Measurement Count: 233/234 THD fake slack: 0.09489 Psoll: [0.02894068 0.00046048 0. 0. ] Laptime: 0.179
Traceback (most recent call last):
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 87, in <module>
main()
File "C:/Users/Artur/Desktop/RL_framework/train.py", line 77, in main
duration = cf.training(episodes, env, agent, filename, topology=topology, multi_processing=multi_processing, CPUs_used=CPUs_used)
File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 166, in training
save_interval = parallel_training(range(episodes), env, agent, log_data_qvalues, log_data, filename, CPUs_used)
File "C:\Users\Artur\Desktop\RL_framework\help_functions\custom_functions.py", line 81, in parallel_training
mp.Pool().map(func_part, range(env.steplength-1))
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 657, in get
raise self._value
File "C:\Users\Artur\Anaconda\lib\multiprocessing\pool.py", line 431, in _handle_tasks
put(task)
File "C:\Users\Artur\Anaconda\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Users\Artur\Anaconda\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
MemoryError
Is there a way to fix this?
As you create processes in the loop, I believe your memory is choked because the processes you create are left hanging after they finish running.
From the documentaion
Warning: multiprocessing.pool objects have internal resources that need to be properly managed (like any other resource) by using the pool as a context manager or by calling close() and terminate() manually. Failure to do this can lead to the process hanging on finalization. Note that is not correct to rely on the garbage colletor to destroy the pool as CPython does not assure that the finalizer of the pool will be called (see object.del() for more information).
I suggest you try to refactor your code a little:
# set the CPUs_used to a desired number or None to use all available CPUs
with mp.Pool(processes=CPUs_used) as p:
p.map(func_part, range(env.steplength-1))
Or you can manually .close()
and .join()
, whichever suits your coding style best.