Search code examples
multithreadingpython-3.xconcurrent.futures

Python 3 concurrent.futures and per-thread initialization


In Python 3, is it possible to use a subclass of Thread in the context of a concurrent.futures.ThreadPoolExecutor, so that they can be individually initialized before processing (presumably many) work items?

I'd like to use the convenient concurrent.futures API for a piece of code that syncs up files and S3 objects (each work item is one file to sync if the corresponding S3 object is inexistent or out-of-sync). I would like each worker thread to do some initialization first, such as setting up a boto3.session.Session. Then that thread pool of workers would be ready to process potentially thousands of work items (files to sync).

BTW, if a thread dies for some reason, is it reasonable to expect a new thread to be automatically created and added back to the pool?

(Disclaimer: I am much more familiar with Java's multithreading framework than Python's one).


Solution

  • So, it seems that a simple solution to my problem is to use threading.local to store a per-thread "session" (in the mockup below, just a random int). Perhaps not the cleanest I guess but for now it will do. Here is a mockup (Python 3.5.1):

    import time
    import threading
    import concurrent.futures
    import random
    import logging
    
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')
    
    x = [0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]
    
    mydata = threading.local()
    
    def do_work(secs):
        if 'session' in mydata.__dict__:
            logging.debug('re-using session "{}"'.format(mydata.session))
        else:
            mydata.session = random.randint(0,1000)
            logging.debug('created new session: "{}"'.format(mydata.session))
        time.sleep(secs)
        logging.debug('slept for {} seconds'.format(secs))
        return secs
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        y = executor.map(do_work, x)
    
    print(list(y))
    

    Produces the following output, showing that "sessions" are indeed local to each thread and reused:

    (Thread-1) 29 - created new session: "855"
    (Thread-2) 29 - created new session: "58"
    (Thread-3) 30 - created new session: "210"
    (Thread-1) 129 - slept for 0.1 seconds
    (Thread-1) 130 - re-using session "855"
    (Thread-2) 130 - slept for 0.1 seconds
    (Thread-2) 130 - re-using session "58"
    (Thread-3) 230 - slept for 0.2 seconds
    (Thread-3) 230 - re-using session "210"
    (Thread-3) 331 - slept for 0.1 seconds
    (Thread-3) 331 - re-using session "210"
    (Thread-3) 331 - slept for 0.0 seconds
    (Thread-1) 530 - slept for 0.4 seconds
    (Thread-2) 1131 - slept for 1.0 seconds
    [0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]
    

    Minor note about logging: in order to use this in an IPython notebook, the logging setup needs to be slightly modified (since IPython has already setup a root logger). A more robust logging setup would be:

    IN_IPYNB = 'get_ipython' in vars()
    
    if IN_IPYNB:
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)
        for h in logger.handlers:
            h.setFormatter(logging.Formatter(
                    '(%(threadName)-0s) %(relativeCreated)d - %(message)s'))
    else:
        logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')