Search code examples
pythonoptimizationmultiprocessingpython-multiprocessing

Python multiprocessing error when trying to run a process pool from within a spawned process


I have a project where I star 4 processes to query different databases.

self.counter_puller.start() 
self.counter_puller.join()

This happens in the file where main is. Now within Counter_puller process I am calling calculate_kpis which is a method of class Calculator(object).

class CountersPuller(Process):
    _db = None 
    _logger = None 
    _sites_info = None

def __init__(self, config, output_dir, area, level):
    Process.__init__(self)
...
calculator.calculate_kpis(self._pulled_counters)`

And in that method I have the process pool:

with Pool(globals.NUM_KPI_WORKERS) as exe: 
   results = exe.starmap(worker_calcuiating_kpi, zip(self._kpis, counters))   
for res in results:        
    calculated_kpis[res.columns[0]] = res.values

When testing this part of the code calculate_kpis() from a unit test everything works fine but when I run the full program I get an error when initializing the process pool.

Process NrPmDataCountersPuller-1:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/var/www/html/kpi-analyzer/src/dbAggregator/counters_puller.py", line 135, in run
    calculator.calculate_kpis(self._pulled_counters)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 268, in calculate_kpis
    self._generate_and_save_kpi_carrier_level(input_data)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 280, in _generate_and_save_kpi_carrier_level
    calculated_kpis = self.__calculate_carrier_level(input_data)
  File "/var/www/html/kpi-analyzer/src/dbAggregator/kpi_calculator.py", line 349, in __calculate_carrier_level
    with Pool(globals.NUM_KPI_WORKERS) as exe:
  File "/usr/lib/python3.10/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 191, in __init__
    self._setup_queues()
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 346, in _setup_queues
    self._inqueue = self._ctx.SimpleQueue()
  File "/usr/lib/python3.10/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/usr/lib/python3.10/multiprocessing/queues.py", line 341, in __init__
    self._rlock = ctx.Lock()
  File "/usr/lib/python3.10/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 58, in __init__
    kind, value, maxvalue, self._make_name(),
  File "/usr/lib/python3.10/multiprocessing/synchronize.py", line 116, in _make_name
    return '%s-%s' % (process.current_process()._config['semprefix'],
TypeError: 'Config' object is not subscriptable

Any hint that could help me move forward would be appreciated.


Solution

  • you have a _config member in your CountersPuller class which is clashing with the _config member of python's Process class.

    avoid assigning to or using the member name _config in this class, or avoid the whole inheritance thing and just pass your configurations as an argument to your function.

    you can easily replicate the bug using the following code

    import multiprocessing
    
    class MyProcess(multiprocessing.Process):
    
        def __init__(self):
            super().__init__()
            self._config = None
    
        def run(self):
            with multiprocessing.Pool() as p:
                pass
    
    if __name__ == "__main__":
        proc = MyProcess()
        proc.start()
        proc.join()
    

    that's one more reason to favor composition over inheritance that goes into the list, and this is how you do it without inheritence.

    import multiprocessing
    
    class MyProcess:
    
        def __init__(self):
            self._config = "hello"
    
        def do_something(self):
            print(self._config)
            with multiprocessing.Pool() as p:
                pass
    
    if __name__ == "__main__":
        proc_config = MyProcess()
        proc = multiprocessing.Process(target=proc_config.do_something)
        proc.start()
        proc.join()