I'm playing around with logging.handlers.QueueHandler
(I'm trying to integrate it into my pytest suite). Here's my MRE:
import concurrent.futures
import logging
import logging.handlers
import multiprocessing
import threading
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def init_job(log_queue):
logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]
def func():
logger.info('Here')
def thread_func(log_queue):
while (record := log_queue.get()) is not None:
logger.info('Handling record')
logger.handle(record)
def main():
log_queue = multiprocessing.Queue()
thread = threading.Thread(target=thread_func, args=(log_queue,))
thread.start()
with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
future = executor.submit(func)
future.result()
log_queue.put(None)
thread.join()
This works as expected. However, I notice that the thread function doesn't receive the record until after future.result()
is called. That is, if I put future.result()
after thread.join()
, nothing gets logged.
How do I get my records in real time?
Without explicitly waiting for the submitted task to complete you have a race condition: Will the main process putting the sentinel value None
on the queue to signal the log-handling thread to terminate occur before the submitted task has a chance to have a log message added to the queue? Probably not.
You only need to ensure that the sentinel
value is put on the queue after all submitted tasks that put items on the queue (i.e. that log messages) have completed. That means executing the code that puts the sentinel after the termination of the with
block, which implicitly calls executor.shutdown(wait=True)
when exited to await the completion of all submitted tasks.
The following demonstrates that the log-handling thread is getting the messages immediately, i.e. before the submitted pool tasks complete:
import concurrent.futures
import logging
import logging.handlers
import multiprocessing
import threading
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def init_job(log_queue):
logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]
def func():
logger.info(f'Here sent at %f', time.time())
time.sleep(2)
print(f'func ended at', time.time())
def thread_func(log_queue):
while (record := log_queue.get()) is not None:
logger.handle(record)
logger.info('Handled record received at %f', time.time())
def main():
log_queue = multiprocessing.Queue()
thread = threading.Thread(target=thread_func, args=(log_queue,))
thread.start()
with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
executor.submit(func)
time.sleep(1)
executor.submit(func)
# An implicit executor.shutdown(wait=True) is called here to wait
# for all submitted tasks to complete.
# Now it is safe to send sentinel:
log_queue.put(None)
thread.join()
if __name__ == '__main__':
main()
Prints:
INFO:root:Here sent at 1693576570.512858
INFO:root:Handled record received at 1693576570.515859
INFO:root:Here sent at 1693576571.376398
INFO:root:Handled record received at 1693576571.377398
func ended at 1693576572.5241294
func ended at 1693576573.3787067
Alternate, Simplified Approach
Instead of creating your own log-handling thread, you can instead use a logging.handlers.QueueListener
instance thread, which will handle putting the required sentinel on the queue when its stop
method is called. Again, just be sure to call its stop
method after all log messages have been generated:
import concurrent.futures
import logging
import logging.handlers
import multiprocessing
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()
def init_job(log_queue):
logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]
def func():
logger.info('Here sent at %f', time.time())
time.sleep(2)
logger.info('func ended at %f', time.time())
def main():
log_queue = multiprocessing.Queue()
# To output to sys.stderr:
log_handler = logging.StreamHandler()
queue_listener = logging.handlers.QueueListener(log_queue, log_handler)
queue_listener.start() # start thread
with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
executor.submit(func)
time.sleep(1)
executor.submit(func)
# An implicit executor.shutdown(wait=True) is called here to wait
# for all submitted tasks to complete.
# Stop thread:
queue_listener.stop()
if __name__ == '__main__':
main()
Prints:
Here sent at 1693657780.574981
Here sent at 1693657781.438310
func ended at 1693657782.580926
func ended at 1693657783.445612