In Python 3, is it possible to use a subclass of Thread
in the context of a concurrent.futures.ThreadPoolExecutor
, so that they can be individually initialized before processing (presumably many) work items?
I'd like to use the convenient concurrent.futures API for a piece of code that syncs up files and S3 objects (each work item is one file to sync if the corresponding S3 object is inexistent or out-of-sync). I would like each worker thread to do some initialization first, such as setting up a boto3.session.Session
. Then that thread pool of workers would be ready to process potentially thousands of work items (files to sync).
BTW, if a thread dies for some reason, is it reasonable to expect a new thread to be automatically created and added back to the pool?
(Disclaimer: I am much more familiar with Java's multithreading framework than Python's one).
So, it seems that a simple solution to my problem is to use threading.local
to store a per-thread "session" (in the mockup below, just a random int). Perhaps not the cleanest I guess but for now it will do. Here is a mockup (Python 3.5.1):
import time
import threading
import concurrent.futures
import random
import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')
x = [0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]
mydata = threading.local()
def do_work(secs):
if 'session' in mydata.__dict__:
logging.debug('re-using session "{}"'.format(mydata.session))
else:
mydata.session = random.randint(0,1000)
logging.debug('created new session: "{}"'.format(mydata.session))
time.sleep(secs)
logging.debug('slept for {} seconds'.format(secs))
return secs
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
y = executor.map(do_work, x)
print(list(y))
Produces the following output, showing that "sessions" are indeed local to each thread and reused:
(Thread-1) 29 - created new session: "855"
(Thread-2) 29 - created new session: "58"
(Thread-3) 30 - created new session: "210"
(Thread-1) 129 - slept for 0.1 seconds
(Thread-1) 130 - re-using session "855"
(Thread-2) 130 - slept for 0.1 seconds
(Thread-2) 130 - re-using session "58"
(Thread-3) 230 - slept for 0.2 seconds
(Thread-3) 230 - re-using session "210"
(Thread-3) 331 - slept for 0.1 seconds
(Thread-3) 331 - re-using session "210"
(Thread-3) 331 - slept for 0.0 seconds
(Thread-1) 530 - slept for 0.4 seconds
(Thread-2) 1131 - slept for 1.0 seconds
[0.1, 0.1, 0.2, 0.4, 1.0, 0.1, 0.0]
Minor note about logging: in order to use this in an IPython notebook, the logging setup needs to be slightly modified (since IPython has already setup a root logger). A more robust logging setup would be:
IN_IPYNB = 'get_ipython' in vars()
if IN_IPYNB:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
for h in logger.handlers:
h.setFormatter(logging.Formatter(
'(%(threadName)-0s) %(relativeCreated)d - %(message)s'))
else:
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-0s) %(relativeCreated)d - %(message)s')