the following code has 5 workers .... each opens its own worker_task()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try: data = future.result()
BUT ..... inside each worker_task() ...... I cannot identify ... which of the 5 workers is currently being used (Worker_ID)
If I want to print('worker 3 has finished')
inside worker_task() ..... I cannot do this because executor.submit
does not allow
Any solutions?
You can get name of worker thread with the help of threading.current_thread()
function. Please find some example below:
from concurrent.futures import ThreadPoolExecutor, Future
from threading import current_thread
from time import sleep
from random import randint
# imagine these are urls
URLS = [i for i in range(100)]
def do_some_work(url, a, b):
"""Simulates some work"""
sleep(2)
rand_num = randint(a, b)
if rand_num == 5:
raise ValueError("No! 5 found!")
r = f"{current_thread().getName()}||: {url}_{rand_num}\n"
return r
def show_fut_results(fut: Future):
"""Callback for future shows results or shows error"""
if not fut.exception():
print(fut.result())
else:
print(f"{current_thread().getName()}| Error: {fut.exception()}\n")
if __name__ == '__main__':
with ThreadPoolExecutor(max_workers=10) as pool:
for i in URLS:
_fut = pool.submit(do_some_work, i, 1, 10)
_fut.add_done_callback(show_fut_results)
If you want more control over threads, use threading
module:
from threading import Thread
from queue import Queue
from time import sleep
from random import randint
import logging
# imagine these are urls
URLS = [f"URL-{i}" for i in range(100)]
# number of worker threads
WORKER_NUM = 10
def do_some_work(url: str, a: int, b: int) -> str:
"""Simulates some work"""
sleep(2)
rand_num = randint(a, b)
if rand_num == 5:
raise ValueError(f"No! 5 found in URL: {url}")
r = f"{url} = {rand_num}"
return r
def thread_worker_func(q: Queue, a: int, b: int) -> None:
"""Target function for Worker threads"""
logging.info("Started working")
while True:
try:
url = q.get()
# if poison pill - stop worker thread
if url is None:
break
r = do_some_work(url, a, b)
logging.info(f"Result: {r}")
except ValueError as ex:
logging.error(ex)
except Exception as ex:
logging.error(f"Unexpected error: {ex}")
logging.info("Finished working")
if __name__ == '__main__':
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
)
in_q = Queue(50)
workers = [
Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
for i in range(WORKER_NUM)
]
[w.start() for w in workers]
# start distributing tasks
for _url in URLS:
in_q.put(_url)
# send poison pills to worker-threads
for w in workers:
in_q.put(None)
# wait worker thread to join Main Thread
logging.info("Main Thread waiting for Worker Threads")
[w.join() for w in workers]
logging.info("Workers joined")
sleep(10)
logging.info("App finished")