Search code examples
pythonpython-3.xloggingpython-multithreadingconcurrent.futures

How to buffer logs from multithreaded function call so that they can be logged in order the functions finish?


the problem

I'm trying to use the concurrent.futures library to run a function on a list of "things". The code looks something like this.

import concurrent.futures
import logging

logger = logging.getLogger(__name__)

def process_thing(thing, count):
    logger.info(f'starting processing for thing {count}')
    # Do some io related stuff
    logger.info(f'finished processing for thing {count}')

def process_things_concurrently(things)
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for count, thing in enumerate(things):
            futures.append(executor.submit(process_thing, thing, count))

        for future in concurrent.futures.as_completed(futures):
            future.result()

As the code is now, the logging can happen in any order.

For example:

starting processing for thing 2
starting processing for thing 1
finished processing for thing 2
finished processing for thing 1

I want to change the code so that the records for a particular call of process_thing() are buffered until the future finishes.

In other words, all of the records for a particular call stick together. These 'groups' of records are ordered by when the call finished.

So from the example above the log output above would instead look like

starting processing for thing 2
finished processing for thing 2
starting processing for thing 1
finished processing for thing 1

what I've tried

I tried making a logger for each call that would have its own custom handler, possibly subclassing BufferingHandler. But eventually there will be lots of "things" and I read that making a lot of loggers is bad.

I'm open to anything that works! Thanks.


Solution

  • Here's a little recipe for a DelaydLogger class that puts all calls to logger's methods into a list instead of actually performing the call, until you finally do a flush where they are all fired up.

    from functools import partial
    
    class DelayedLogger:
        def __init__(self, logger):
            self.logger = logger
            self._call_stack = []  #  list of (method, *args, **kwargs) tuples
            self._delayed_methods = {
                name : partial(self._delayed_method_proxy, getattr(logger, name))
                for name in ["info", "debug", "warning", "error", "critical"]
            }
    
        def __getattr__(self, name):
            """ Proxy getattr to self.logger, except for self._delayed_methods. """
            return self._delayed_methods.get(name, getattr(self.logger, name))
    
        def _delayed_method_proxy(self, method, *args, **kwargs):
            self._call_stack.append((method, args, kwargs))
    
        def flush(self):
            """ Flush self._call_stack to the real logger. """
            for method, args, kwargs in self._call_stack:
                method(*args, **kwargs)
            self._call_stack = []
    

    In your example, you could use it like so:

    import logging
    logger = logging.getLogger(__name__)
    
    def process_thing(thing, count):    
        dlogger = DelayedLogger(logger)
        dlogger.info(f'starting processing for thing {count}')
        # Do some io related stuff
        dlogger.info(f'finished processing for thing {count}')    
        dlogger.flush()
    
    process_thing(None, 10)
    

    There may be ways to beautfiy this or make it more compact, but it should get the job done if that's what you really want.