Search code examples
pythonerror-handlingmultiprocessingpython-multiprocessing

Multiprocessing in python won't keep log of errors in log file


I've implemented multiprocessing in a data analysis I'm running but sometimes there are errors and instead of having the entire que get killed I wanted them to just be ignored so I implemeneted a try statement. But new errors keep cropping up, is there a more exhaustive list of erros I can include in the except statement? In general I'd just like errors to be logged and then the code to move on.

Secondly though, the errors are not getting logged in the log file I'm making. Not sure why. This is an example of my code, I've replaced the analysis in teh try statement with some simpler steps. More or less does the same thing, takes the data and writes an output to a csv for each dataset a process is running on (here represented with the dictionary of dataframes). NOTE: I have purposely introduced a keyerror here by misnaming one of the columns in the second dataframe.

Example code...

import pandas as pd
import numpy as np
import logging
import traceback
from multiprocessing import Pool, Manager, Process

output_dir = ''
input_dir = 'df_folder'

# make our data
    # Create a dict of 5 dataframes with 3 columns of ten entries each
    df_dict = {i: pd.DataFrame(np.random.rand(10, 3), columns=['col1', 'col2', 'col3']) for i in range(5)}
    # Introduce an error by changing a column name in one of the dataframes
    df_dict[1].columns = ['col1', 'col2', 'wrong_col']

for key, df in df_dict.items():
    file_name = f"{key}_df.csv"
    file_path = os.path.join(input_dir, file_name)
    df.to_csv(file_path, index=False)

   
# define funditons for mutiprocessing and error logging...
def listener_process(queue):
    logging.basicConfig(filename='abi_detector_app.log', filemode='w', format='%(name)s - %(levelname)s - %(message)s')
    while True:
        message = queue.get()
        if message == 'kill':
            break
        logging.error(message)



def example_process(df_id, queue):
    df = pd.read_csv(f"{input_dir}/{df_id}_df.csv")
    try:
        for col in ['col1', 'col2', 'col3']:
            mean = df[col].mean()
            std = df[col].std()
            result = pd.DataFrame({'mean': [mean], 'std': [std]}, index=[col])
            result.to_csv(f'{output_dir}/df_{df_id}_{col}_stats.csv')
    except (IndexError, KeyError) as e:
        logging.error('Error in dataframe id: %s', df_id)
        logging.error(traceback.format_exc())



manager = Manager()
queue = manager.Queue()
listener = Process(target=listener_process, args=(queue,))
listener.start()


pool_size = 5
df_list = df_dict.keys()
# run the processes with the specified number of cores

# new code which passes the error messages to the listener process
with Pool(pool_size) as p:
    p.starmap(example_process, [(df_id, queue) for df_id in df_list])

queue.put('kill')
listener.join()

NOTE df_dict is a place holder for what my script is actually doing. I editted the example so that it is written to file as a folder which stores the generated dataframes as csv's. Then example process loads them. This is a better example of whats happening because df_dict really does not need to be shared across processes. Just the error logging file.


Solution

  • I have modified your code based on the Logging to a single file from multiple processes.

    Update

    The code executes successfully whether the platform is Unix-like, which by default uses the fork method of starting new processes, or Windows, which must use the spawn method of starting new processes.

    We would like the code that creates the dataframes to be executed once. If that code is at global scope but not within a if __name__ == '__main__': block, then under Windows each pool process will be re-creating the dataframes. This could be inefficient depending on the complexity of the code. Moreover, the code is using a random number generator to create the dataframes and this would result in each pool process creating dataframes with different data, which could be a problem. Therefore, we need to ensure that the dataframe-creation code is executed only once by the main process and initialized in each pool process once using a pool initializer function. This technique will work whether we are running under Windows or Linux. However, if we are running under Linux or some other platform that supports the fork method of creating processes, then it is generally more efficient to allow the pool processes to inherit the dataframes from the main process rather than passing the dataframes to a pool initializer. The following code now checks to see if we can use the fork method and if so allows the pool processes to inherit the dataframes. A pool initializer is still used to initialize the multiprocessing log handling for each process that will be logging messages.

    import pandas as pd
    import numpy as np
    import logging
    import logging.handlers
    import traceback
    import sys
    from multiprocessing import Pool, Queue, Process, set_start_method
    
    def listener_process(queue):
        root = logging.getLogger()
        h = logging.FileHandler('abi_detector_app.log', mode='w')
        f = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
        h.setFormatter(f)
        root.addHandler(h)
    
        while True:
            try:
                record = queue.get()
                if record is None:  # We send this as a sentinel to tell the listener to quit.
                    break
                logger = logging.getLogger(record.name)
                logger.handle(record)  # No level or filter logic applied - just do it!
            except Exception:
                print('Whoops! Problem:', file=sys.stderr)
                traceback.print_exc(file=sys.stderr)
    
    
    def init_pool(*args):
        queue = args[0]
        h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
        root = logging.getLogger()
        root.addHandler(h)
        # send all messages, for demo; no other level or filter logic applied.
        root.setLevel(logging.DEBUG)
    
        if len(args) == 2:
            # We have additionally been past the dataframes because they could not
            # be inherited.
            global df_dict
    
            df_dict = args[1]
    
    
    output_dir = ''
    
    def example_process(df_id):
        df = df_dict[df_id]
        try:
            for col in ['col1', 'col2', 'col3']:
                mean = df[col].mean()
                std = df[col].std()
                result = pd.DataFrame({'mean': [mean], 'std': [std]}, index=[col])
                result.to_csv(f'{output_dir}/df_{df_id}_{col}_stats.csv')
            logging.debug(f'Processing complete for id {df_id}.')
        except Exception:  # Catch all possible exceptions
            logging.error('Error in dataframe id: %s', df_id)
            logging.error(traceback.format_exc())
    
    
    if __name__ == '__main__':
        # If we can use the fork method of starting new processes, then the pool processes
        # can inherit the dataframes from the main process, which is normally more efficient
        # than passing the dataframes to each pool process using the pool initializer.
        try:
            set_start_method('fork')
        except:
            using_fork = False
        else:
            using_fork = True
    
        # Create a dict of 5 dataframes with 3 columns of ten entries each.
        # We are ensuring that the code to create the dataframes is only executed once (by
        # the main process):
        df_dict = {i: pd.DataFrame(np.random.rand(10, 3), columns=['col1', 'col2', 'col3']) for i in range(5)}
        # Introduce an error by changing a column name in one of the dataframes
        df_dict[1].columns = ['col1', 'col2', 'wrong_col']
    
        queue = Queue()
        listener = Process(target=listener_process, args=(queue,))
        listener.start()
    
        pool_size = 5
        df_list = df_dict.keys()
        # run the processes with the specified number of cores
    
        # Can the pool processes inherit the dataframes or must we pass them
        # to each pool process?
        pool_initargs = (queue,) if using_fork else (queue, df_dict)
    
        # new code which passes the error messages to the listener process
        with Pool(pool_size, initializer=init_pool, initargs=pool_initargs) as p:
            p.map(example_process, df_list)
    
        #queue.put('kill')
        queue.put(None) # Use None as the seninel
        listener.join()
    

    The contents of abi_detector_app.log:

    root - DEBUG - Processing complete for id 0.
    root - ERROR - Error in dataframe id: 1
    root - ERROR - Traceback (most recent call last):
      File "C:\Program Files\Python38\lib\site-packages\pandas\core\indexes\base.py", line 3361, in get_loc
        return self._engine.get_loc(casted_key)
      File "pandas\_libs\index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
      File "pandas\_libs\index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
      File "pandas\_libs\hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
      File "pandas\_libs\hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
    KeyError: 'col3'
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "C:\Booboo\test\test.py", line 46, in example_process
        mean = df[col].mean()
      File "C:\Program Files\Python38\lib\site-packages\pandas\core\frame.py", line 3455, in __getitem__
        indexer = self.columns.get_loc(key)
      File "C:\Program Files\Python38\lib\site-packages\pandas\core\indexes\base.py", line 3363, in get_loc
        raise KeyError(key) from err
    KeyError: 'col3'
    
    root - DEBUG - Processing complete for id 2.
    root - DEBUG - Processing complete for id 3.
    root - DEBUG - Processing complete for id 4.
    

    Update 2

    Since you have updated your question, I have modified my answer to conform with your new requirements. Note that the following code only works on platforms using the fork method for creating processes. You can see the previous update to see how you would modify the code to be cross-platform compatible. Also, please see my added comments to your code; they have the string (Booboo) so that you can easily search for them.

    import pandas as pd
    import numpy as np
    import logging
    import logging.handlers
    import traceback
    # Use a multiprocessing queue instead of a managed queue (Booboo):
    from multiprocessing import Pool, Queue, Process
    # Add missing import (Booboo):
    import os
    
    output_dir = ''
    input_dir = 'df_folder'
    
    # make our data
    # Corrected indentation problem (Booboo)
    # Create a dict of 5 dataframes with 3 columns of ten entries each
    df_dict = {i: pd.DataFrame(np.random.rand(10, 3), columns=['col1', 'col2', 'col3']) for i in range(5)}
    # Introduce an error by changing a column name in one of the dataframes
    df_dict[1].columns = ['col1', 'col2', 'wrong_col']
    
    for key, df in df_dict.items():
        file_name = f"{key}_df.csv"
        file_path = os.path.join(input_dir, file_name)
        df.to_csv(file_path, index=False)
    
    def init_pool(queue):
        h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
        root = logging.getLogger()
        root.addHandler(h)
        # send all messages, for demo; no other level or filter logic applied.
        root.setLevel(logging.DEBUG)
    
    def listener_process(queue):
        root = logging.getLogger()
        h = logging.FileHandler('abi_detector_app.log', mode='w')
        f = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
        h.setFormatter(f)
        root.addHandler(h)
    
        while True:
            message = queue.get()
            if message == 'kill':
                break
            logger = logging.getLogger(message.name)
            logger.handle(message)
    
    # There is no point in passing queue to this worker function
    # since it is not used (Booboo):
    def example_process(df_id):
        df = pd.read_csv(f"{input_dir}/{df_id}_df.csv")
        try:
            for col in ['col1', 'col2', 'col3']:
                mean = df[col].mean()
                std = df[col].std()
                result = pd.DataFrame({'mean': [mean], 'std': [std]}, index=[col])
                result.to_csv(f'{output_dir}/df_{df_id}_{col}_stats.csv')
        except (IndexError, KeyError) as e:
            logging.error('Error in dataframe id: %s', df_id)
            logging.error(traceback.format_exc())
    
    # A multiprocessing.Queue will offer better performance than a
    # managed queue (Booboo):
    queue = Queue()
    listener = Process(target=listener_process, args=(queue,))
    listener.start()
    
    pool_size = 5
    df_list = df_dict.keys()
    # run the processes with the specified number of cores:
    with Pool(pool_size, initializer=init_pool, initargs=(queue,)) as p:
        # There is no point in passing queue to this worker function
        # since it is not used (Booboo):
        p.map(example_process, df_list)
    
    queue.put('kill')
    listener.join()
    

    Contents of log file:

    root - ERROR - Error in dataframe id: 1
    root - ERROR - Traceback (most recent call last):
      File "/home/Booboo/.local/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 3361, in get_loc
        return self._engine.get_loc(casted_key)
      File "pandas/_libs/index.pyx", line 76, in pandas._libs.index.IndexEngine.get_loc
      File "pandas/_libs/index.pyx", line 108, in pandas._libs.index.IndexEngine.get_loc
      File "pandas/_libs/hashtable_class_helper.pxi", line 5198, in pandas._libs.hashtable.PyObjectHashTable.get_item
      File "pandas/_libs/hashtable_class_helper.pxi", line 5206, in pandas._libs.hashtable.PyObjectHashTable.get_item
    KeyError: 'col3'
    
    The above exception was the direct cause of the following exception:
    
    Traceback (most recent call last):
      File "/mnt/c/Booboo/test/test.py", line 52, in example_process
        mean = df[col].mean()
      File "/home/Booboo/.local/lib/python3.9/site-packages/pandas/core/frame.py", line 3458, in __getitem__
        indexer = self.columns.get_loc(key)
      File "/home/Booboo/.local/lib/python3.9/site-packages/pandas/core/indexes/base.py", line 3363, in get_loc
        raise KeyError(key) from err
    KeyError: 'col3'