Search code examples
pythonoopconcurrencyparallel-processingmultiprocessing

python subclassing multiprocessing.Process


I am new to python object oriented and I am rewriting my existing application as an object oriented version, because now developers are increasing and my code is becoming un-maintainable.

Normally I use multiprocessing queues but I found from this example http://www.doughellmann.com/PyMOTW/multiprocessing/basics.html that I can subclass multiprocessing.Process so I think it's a good idea and I wrote a class to test like this:

code:

from multiprocessing import Process
class Processor(Process):
    def return_name(self):
        return "Process %s" % self.name
    def run(self):
        return self.return_name()

processes = []


if __name__ == "__main__":

        for i in range(0,5):
                p=Processor()
                processes.append(p)
                p.start()
        for p in processes:
                p.join()

However I cannot get back the values, how can I use queues in this way?

EDIT: I want to get the return value and thinking where to put Queues().


Solution

  • Subclassing multiprocessing.Process:

    However I cannot get back the values, how can I use queues in this way?

    Process needs a Queue() to receive the results... An example of how to subclass multiprocessing.Process follows...

    from multiprocessing import Process, Queue
    class Processor(Process):
    
        def __init__(self, queue, idx, **kwargs):
            super(Processor, self).__init__()
            self.queue = queue
            self.idx = idx
            self.kwargs = kwargs
    
        def run(self):
            """Build some CPU-intensive tasks to run via multiprocessing here."""
            hash(frozenset(self.kwargs.items())) # Shameless usage of CPU for no gain...
    
            ## Return some information back through multiprocessing.Queue
            ## NOTE: self.name is an attribute of multiprocessing.Process
            self.queue.put("Process idx={0} is called '{1}'".format(self.idx, self.name))
    
    if __name__ == "__main__":
        NUMBER_OF_PROCESSES = 5
    
        ## Create a list to hold running Processor object instances...
        processes = list()
    
        q = Queue()  # Build a single queue to send to all process objects...
        for i in range(0, NUMBER_OF_PROCESSES):
            p=Processor(queue=q, idx=i)
            p.start()
            processes.append(p)
    
        # Incorporating ideas from this answer, below...
        #    https://stackoverflow.com/a/42137966/667301
        [proc.join() for proc in processes]
        while not q.empty():
            print("RESULT: {0}".format(q.get()))   # get results from the queue...
    

    On my machine, this results in...

    $ python test.py
    RESULT: Process idx=0 is called 'Processor-1'
    RESULT: Process idx=4 is called 'Processor-5'
    RESULT: Process idx=3 is called 'Processor-4'
    RESULT: Process idx=1 is called 'Processor-2'
    RESULT: Process idx=2 is called 'Processor-3'
    $
    

    # Using `multiprocessing.Pool`:

    FWIW, one disadvantage I've found to subclassing multiprocessing.Process is that you can't leverage all the built-in goodness of multiprocessing.Pool; Pool gives you a very nice API if you don't need your producer and consumer code to talk to each other through a queue.

    You can do a lot just with some creative return values... in the following example, I use a dict() to encapsulate input and output values from pool_job()...

    from multiprocessing import Pool
    
    def pool_job(input_val=0):
        # FYI, multiprocessing.Pool can't guarantee that it keeps inputs ordered correctly
        # dict format is {input: output}...
        return {'pool_job(input_val={0})'.format(input_val): int(input_val)*12}
    
    pool = Pool(5)  # Use 5 multiprocessing processes to handle jobs...
    results = pool.map(pool_job, xrange(0, 12)) # map xrange(0, 12) into pool_job()
    print results
    

    This results in:

    [
        {'pool_job(input_val=0)': 0}, 
        {'pool_job(input_val=1)': 12}, 
        {'pool_job(input_val=2)': 24}, 
        {'pool_job(input_val=3)': 36}, 
        {'pool_job(input_val=4)': 48}, 
        {'pool_job(input_val=5)': 60}, 
        {'pool_job(input_val=6)': 72}, 
        {'pool_job(input_val=7)': 84}, 
        {'pool_job(input_val=8)': 96}, 
        {'pool_job(input_val=9)': 108}, 
        {'pool_job(input_val=10)': 120}, 
        {'pool_job(input_val=11)': 132}
    ]
    

    Obviously there are plenty of other improvements to be made in pool_job(), such as error handling, but this illustrates the essentials. FYI, this answer provides another example of how to use multiprocessing.Pool.