Search code examples
pythonmultithreadingoopparallel-processingmultiprocessing

Multi-processing code not working in while loop


Happy Sunday.

I have this code that I want to run using the multi-processing module. But it doesn't just work for some reason.

with ProcessPoolExecutor() as executor:
    while True:
        if LOCAL_RUN:
            print("ALERT: Doing a local run of the automation with limited capabilities.")

        list_of_clients = db_manager.get_clients()
        random.shuffle(list_of_clients)

        list_of_not_attempted_clients_domains = db_manager.tags()
        group_of_clients_handlers = {}

        # no matches
        if not list_of_not_attempted_clients_domains:
            sleep = 60 * 10
            pretty_print(f'No matches found. Sleeping for {sleep}s')
            time.sleep(sleep)
            continue

        for client in list_of_clients:
            client_id = client[0]
            client_name = client[1]
            group_of_clients_handlers[client_id] = [ClientsHandler(db_manager), client_name]

        #  MULTI-PROCESSING CODE
        try:
            print('running function...')
            executor.map(
                partial(run, group_of_clients_handlers=group_of_clients_handlers),
                list_of_not_attempted_clients_domains
            )
        except Exception as err:
            print(err)

Despite all my attempts to debug this, I have no idea why this doesn't work, although I feel it relates to processes taking time to start up or scheduling task etc but I am not certain.

The while loop just keeps running and I see all the print statements like running function... but the run function never executes. The run function is a very large function with nested large functions.

The except block doesn't print out any error either. Would love to hear what you think...


Solution

  • ProcessPoolExecutor.map creates an iterator, you must consume the iterator to get the exception, otherwise the exception will be discarded.

    from concurrent.futures import ProcessPoolExecutor
    
    def raising_func(val):
      raise ValueError(val)
    
    with ProcessPoolExecutor(4) as pool:
      pool.map(raising_func, [1,2,3,4,5])
    
    with ProcessPoolExecutor(4) as pool:
      list(pool.map(raising_func, [1,2,3,4,5]))  # < ---- exception is thrown from here