Search code examples
pythonpython-3.xmultithreadingpython-multithreading

Python3 : How to spawn jobs in parallel


I am pretty new to multithreading and would like to explore. I have a json file, that provides some config. Based on this, i need to kick off some processing. Here is the config

{
    "job1":{
        "param1":"val1",
        "param2":"val2"
    },
    "job2":{
        "param3":"val3",
        "param4":"val4"
    }
}

and here is the python snippet

config_file = open('config.json')
config_data = json.load(config_file)
for job_name,job_atts in metric_data.items():
    perform_job(job_name,job_atts)

so in this way, i can finish up the jobs one by one.

Is there a way to run/kick off these jobs in parallel? Note that these jobs are completely independent of each other and do not need to be performed in a seqeuence.

How can i achieve parallel runs via python?

Update

Here is what i tried

    >>> from multiprocessing import Pool
    >>> 
    >>> config_data = json.loads(''' {
    ...     "job1":{
    ...         "param1":"val1",
    ...         "param2":"val2"
    ...     },
    ...     "job2":{
    ...         "param3":"val3",
    ...         "param4":"val4"
    ...     }
    ... }''')
    >>> def perform_job(job_name,job_atts):
    ...     print(job_name)
    ...     print(job_atts)
    ... 
    >>> args = [(name, attrs)
    ...         for name, attrs in config_data.items()]
    >>> 
    >>> with Pool() as pool:
    ...     pool.starmap(perform_job, args)
    ... 
    Process SpawnPoolWorker-27:
    Process SpawnPoolWorker-24:
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
        task = get()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
        return _ForkingPickler.loads(res)
    AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
    Traceback (most recent call last):
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 315, in _bootstrap
        self.run()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/pool.py", line 114, in worker
        task = get()
      File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/multiprocessing/queues.py", line 368, in get
        return _ForkingPickler.loads(res)
    AttributeError: Can't get attribute 'perform_job' on <module '__main__' (built-in)>
    

But i am still getting the error


Solution

  • You are looking for the multiprocessing module. Use a process pool to iterate over many jobs.

    Here is an example source file that runs correctly when executed as $ python spawn.py. Putting the main code within a def main(): function is nice but not critical. Protecting it with an "if name..." clause is quite important, since child interpreters will be re-parsing the source file, see "safe importing of main". (Notice that the "single core" test won't run within the children.) The most relevant lines are the last two.

    #! /usr/bin/env python
    from multiprocessing import Pool
    import json
    
    config_data = json.loads(
        """ {
        "job1":{
            "param1":"val1",
            "param2":"val2"
        },
        "job2":{
            "param3":"val3",
            "param4":"val4"
        }
    } """
    )
    args = list(config_data.items())
    
    
    def perform_job(job_name, job_atts):
        print(job_name, job_atts)
    
    
    if __name__ == "__main__":
    
        # Single core:
        perform_job(*args[0])
        perform_job(*args[1])
        print()
    
        # Multi core:
        with Pool() as pool:
            pool.starmap(perform_job, args)
    

    starmap