Search code examples
pythonmultithreadingconcurrencymultiprocess

Can't get multiprocessing to run processes concurrently


The code below doesn't seem to run concurrently, and I'm not sure exactly why:

def run_normalizers(config, debug, num_threads, name=None):

    def _run():
        print('Started process for normalizer')
        sqla_engine = init_sqla_from_config(config)
        image_vfs = create_s3vfs_from_config(config, config.AWS_S3_IMAGE_BUCKET)
        storage_vfs = create_s3vfs_from_config(config, config.AWS_S3_STORAGE_BUCKET)

        pp = PipedPiper(config, image_vfs, storage_vfs, debug=debug)

        if name:
            pp.run_pipeline_normalizers(name)
        else:
            pp.run_all_normalizers()
        print('Normalizer process complete')

    threads = []
    for i in range(num_threads):
        threads.append(multiprocessing.Process(target=_run))
    [t.start() for t in threads]
    [t.join() for t in threads]


run_normalizers(...)

The config variable is just a dictionary defined outside of the _run() function. All of the processes seem to be created - but it isn't any faster than if I do it with a single process. Basically what's happening in the run_**_normalizers() functions is reading from a queue table in a database (SQLAlchemy), then making a few HTTP requests, and then runing a 'pipeline' of normalizers to modify data and then save it back into the database. I'm coming from the JVM land where threads are 'heavy' and often used for parallelism - i'm a bit confused by this as i thought the multiprocess module was supposed to get around the limitations of Python's GIL.


Solution

  • fixed my multiprocessing problem - and actually switched the threads. Not sure what actually fixed it thought - i just re-architected everything and made workers and tasks and what not and things are flying now. Here's the basics of what i did:

    import abc
    from Queue import Empty, Queue
    from threading import Thread
    
    class AbstractTask(object):
        """
            The base task
        """
        __metaclass__ = abc.ABCMeta
    
        @abc.abstractmethod
        def run_task(self):
            pass
    
    class TaskRunner(object):
    
        def __init__(self, queue_size, num_threads=1, stop_on_exception=False):
            super(TaskRunner, self).__init__()
            self.queue              = Queue(queue_size)
            self.execute_tasks      = True
            self.stop_on_exception  = stop_on_exception
    
            # create a worker
            def _worker():
                while self.execute_tasks:
    
                    # get a task
                    task = None
                    try:
                        task = self.queue.get(False, 1)
                    except Empty:
                        continue
    
                    # execute the task
                    failed = True
                    try:
                        task.run_task()
                        failed = False
                    finally:
                        if failed and self.stop_on_exception:
                            print('Stopping due to exception')
                            self.execute_tasks = False
                        self.queue.task_done()
    
            # start threads
            for i in range(0, int(num_threads)):
                t = Thread(target=_worker)
                t.daemon = True
                t.start()
    
    
        def add_task(self, task, block=True, timeout=None):
            """
                Adds a task
            """
            if not self.execute_tasks:
                raise Exception('TaskRunner is not accepting tasks')
            self.queue.put(task, block, timeout)
    
    
        def wait_for_tasks(self):
            """
                Waits for tasks to complete
            """
            if not self.execute_tasks:
                raise Exception('TaskRunner is not accepting tasks')
            self.queue.join()
    

    all i do is create a TaskRunner and add tasks to it (thousands of them) and then call wait_for_tasks(). so, obviously in the re-architecture that I did I 'fixed' some other problem that i had. Odd though.