Search code examples
pythonparallel-processingtwisted

How to process input in parallel with python, but without processes?


I have a list of input data and would like to process it in parallel, but processing each takes time as network io is involved. CPU usage is not a problem.

I would not like to have the overhead of additional processes since I have a lot of things to process at a time and do not want to setup inter process communication.

# the parallel execution equivalent of this?
import time
input_data = [1,2,3,4,5,6,7]
input_processor = time.sleep
results = map(input_processor, input_data)

The code I am using makes use of twisted.internet.defer so a solution involving that is fine as well.


Solution

  • You can easily define Worker threads that work in parallel till a queue is empty.

    from threading import Thread
    from collections import deque
    import time
    
    
    # Create a new class that inherits from Thread
    class Worker(Thread):
    
        def __init__(self, inqueue, outqueue, func):
            '''
            A worker that calls func on objects in inqueue and
            pushes the result into outqueue
    
            runs until inqueue is empty
            '''
    
            self.inqueue = inqueue
            self.outqueue = outqueue
            self.func = func
            super().__init__()
    
        # override the run method, this is starte when
        # you call worker.start()
        def run(self):
            while self.inqueue:
                data = self.inqueue.popleft()
                print('start')
                result = self.func(data)
                self.outqueue.append(result)
                print('finished')
    
    
    def test(x):
        time.sleep(x)
        return 2 * x
    
    
    if __name__ == '__main__':
        data = 12 * [1, ]
        queue = deque(data)
        result = deque()
    
        # create 3 workers working on the same input
        workers = [Worker(queue, result, test) for _ in range(3)]
    
        # start the workers
        for worker in workers:
            worker.start()
    
        # wait till all workers are finished
        for worker in workers:
            worker.join()
    
        print(result)
    

    As expected, this runs ca. 4 seconds.

    One could also write a simple Pool class to get rid of the noise in the main function:

    from threading import Thread
    from collections import deque
    import time
    
    
    class Pool():
    
        def __init__(self, n_threads):
            self.n_threads = n_threads
    
        def map(self, func, data):
            inqueue = deque(data)
            result = deque()
    
            workers = [Worker(inqueue, result, func) for i in range(self.n_threads)]
    
            for worker in workers:
                worker.start()
    
            for worker in workers:
                worker.join()
    
            return list(result)
    
    
    class Worker(Thread):
    
        def __init__(self, inqueue, outqueue, func):
            '''
            A worker that calls func on objects in inqueue and
            pushes the result into outqueue
    
            runs until inqueue is empty
            '''
    
            self.inqueue = inqueue
            self.outqueue = outqueue
            self.func = func
            super().__init__()
    
        # override the run method, this is starte when
        # you call worker.start()
        def run(self):
            while self.inqueue:
                data = self.inqueue.popleft()
                print('start')
                result = self.func(data)
                self.outqueue.append(result)
                print('finished')
    
    
    def test(x):
        time.sleep(x)
        return 2 * x
    
    
    if __name__ == '__main__':
        data = 12 * [1, ]
    
        pool = Pool(6)
        result = pool.map(test, data)
    
        print(result)