Search code examples
pythonloggingmultiprocessing

Logging from QueueHandler not appearing until future.result() is called


I'm playing around with logging.handlers.QueueHandler (I'm trying to integrate it into my pytest suite). Here's my MRE:

import concurrent.futures
import logging
import logging.handlers
import multiprocessing
import threading

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger()

def init_job(log_queue):
    logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]

def func():
    logger.info('Here')

def thread_func(log_queue):
    while (record := log_queue.get()) is not None:
        logger.info('Handling record')
        logger.handle(record)

def main():
    log_queue = multiprocessing.Queue()
    thread = threading.Thread(target=thread_func, args=(log_queue,))
    thread.start()
    with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
        future = executor.submit(func)
        future.result()
        log_queue.put(None)
        thread.join()

This works as expected. However, I notice that the thread function doesn't receive the record until after future.result() is called. That is, if I put future.result() after thread.join(), nothing gets logged.

How do I get my records in real time?


Solution

  • Without explicitly waiting for the submitted task to complete you have a race condition: Will the main process putting the sentinel value None on the queue to signal the log-handling thread to terminate occur before the submitted task has a chance to have a log message added to the queue? Probably not.

    You only need to ensure that the sentinel value is put on the queue after all submitted tasks that put items on the queue (i.e. that log messages) have completed. That means executing the code that puts the sentinel after the termination of the with block, which implicitly calls executor.shutdown(wait=True) when exited to await the completion of all submitted tasks.

    The following demonstrates that the log-handling thread is getting the messages immediately, i.e. before the submitted pool tasks complete:

    import concurrent.futures
    import logging
    import logging.handlers
    import multiprocessing
    import threading
    import time
    
    logging.basicConfig(level=logging.INFO)
    
    logger = logging.getLogger()
    
    def init_job(log_queue):
        logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]
    
    def func():
        logger.info(f'Here sent at %f', time.time())
        time.sleep(2)
        print(f'func ended at', time.time())
    
    def thread_func(log_queue):
        while (record := log_queue.get()) is not None:
            logger.handle(record)
            logger.info('Handled record received at %f', time.time())
    
    def main():
        log_queue = multiprocessing.Queue()
        thread = threading.Thread(target=thread_func, args=(log_queue,))
        thread.start()
    
        with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
            executor.submit(func)
            time.sleep(1)
            executor.submit(func)
        # An implicit executor.shutdown(wait=True) is called here to wait
        # for all submitted tasks to complete.
    
        # Now it is safe to send sentinel:
        log_queue.put(None)
        thread.join()
    
    if __name__ == '__main__':
        main()
    

    Prints:

    INFO:root:Here sent at 1693576570.512858
    INFO:root:Handled record received at 1693576570.515859
    INFO:root:Here sent at 1693576571.376398
    INFO:root:Handled record received at 1693576571.377398
    func ended at 1693576572.5241294
    func ended at 1693576573.3787067
    

    Alternate, Simplified Approach

    Instead of creating your own log-handling thread, you can instead use a logging.handlers.QueueListener instance thread, which will handle putting the required sentinel on the queue when its stop method is called. Again, just be sure to call its stop method after all log messages have been generated:

    import concurrent.futures
    import logging
    import logging.handlers
    import multiprocessing
    import time
    
    logging.basicConfig(level=logging.INFO)
    
    logger = logging.getLogger()
    
    def init_job(log_queue):
        logging.getLogger().handlers = [logging.handlers.QueueHandler(log_queue)]
    
    def func():
        logger.info('Here sent at %f', time.time())
        time.sleep(2)
        logger.info('func ended at %f', time.time())
    
    def main():
        log_queue = multiprocessing.Queue()
        # To output to sys.stderr:
        log_handler = logging.StreamHandler()
        queue_listener = logging.handlers.QueueListener(log_queue, log_handler)
        queue_listener.start() # start thread
    
        with concurrent.futures.ProcessPoolExecutor(initializer=init_job, initargs=(log_queue,)) as executor:
            executor.submit(func)
            time.sleep(1)
            executor.submit(func)
        # An implicit executor.shutdown(wait=True) is called here to wait
        # for all submitted tasks to complete.
    
        # Stop thread:
        queue_listener.stop()
    
    if __name__ == '__main__':
        main()
    

    Prints:

    Here sent at 1693657780.574981
    Here sent at 1693657781.438310
    func ended at 1693657782.580926
    func ended at 1693657783.445612