I'm trying to use the concurrent.futures
library to run a function on a list of "things". The code looks something like this.
import concurrent.futures
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
logger.info(f'starting processing for thing {count}')
# Do some io related stuff
logger.info(f'finished processing for thing {count}')
def process_things_concurrently(things)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = []
for count, thing in enumerate(things):
futures.append(executor.submit(process_thing, thing, count))
for future in concurrent.futures.as_completed(futures):
future.result()
As the code is now, the logging can happen in any order.
For example:
starting processing for thing 2
starting processing for thing 1
finished processing for thing 2
finished processing for thing 1
I want to change the code so that the records for a particular call of process_thing()
are buffered until the future finishes.
In other words, all of the records for a particular call stick together. These 'groups' of records are ordered by when the call finished.
So from the example above the log output above would instead look like
starting processing for thing 2
finished processing for thing 2
starting processing for thing 1
finished processing for thing 1
I tried making a logger for each call that would have its own custom handler, possibly subclassing BufferingHandler. But eventually there will be lots of "things" and I read that making a lot of loggers is bad.
I'm open to anything that works! Thanks.
Here's a little recipe for a DelaydLogger
class that puts all calls to logger
's methods into a list instead of actually performing the call, until you finally do a flush
where they are all fired up.
from functools import partial
class DelayedLogger:
def __init__(self, logger):
self.logger = logger
self._call_stack = [] # list of (method, *args, **kwargs) tuples
self._delayed_methods = {
name : partial(self._delayed_method_proxy, getattr(logger, name))
for name in ["info", "debug", "warning", "error", "critical"]
}
def __getattr__(self, name):
""" Proxy getattr to self.logger, except for self._delayed_methods. """
return self._delayed_methods.get(name, getattr(self.logger, name))
def _delayed_method_proxy(self, method, *args, **kwargs):
self._call_stack.append((method, args, kwargs))
def flush(self):
""" Flush self._call_stack to the real logger. """
for method, args, kwargs in self._call_stack:
method(*args, **kwargs)
self._call_stack = []
In your example, you could use it like so:
import logging
logger = logging.getLogger(__name__)
def process_thing(thing, count):
dlogger = DelayedLogger(logger)
dlogger.info(f'starting processing for thing {count}')
# Do some io related stuff
dlogger.info(f'finished processing for thing {count}')
dlogger.flush()
process_thing(None, 10)
There may be ways to beautfiy this or make it more compact, but it should get the job done if that's what you really want.