Search code examples
pythonpython-3.xmultithreadingthreadpoolexecutorconcurrent.futures

Identify current thread in concurrent.futures.ThreadPoolExecutor


the following code has 5 workers .... each opens its own worker_task()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_url = {executor.submit(worker_task, command_, site_): site_ for site_ in URLS}

    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try: data = future.result()

BUT ..... inside each worker_task() ...... I cannot identify ... which of the 5 workers is currently being used (Worker_ID)

If I want to print('worker 3 has finished') inside worker_task() ..... I cannot do this because executor.submit does not allow

Any solutions?


Solution

  • You can get name of worker thread with the help of threading.current_thread() function. Please find some example below:

    from concurrent.futures import ThreadPoolExecutor, Future
    from threading import current_thread
    from time import sleep
    from random import randint
    
    # imagine these are urls
    URLS = [i for i in range(100)]
    
    
    def do_some_work(url, a, b):
        """Simulates some work"""
        sleep(2)
        rand_num = randint(a, b)
        if rand_num == 5:
            raise ValueError("No! 5 found!")
        r = f"{current_thread().getName()}||: {url}_{rand_num}\n"
        return r
    
    
    def show_fut_results(fut: Future):
        """Callback for future shows results or shows error"""
        if not fut.exception():
            print(fut.result())
        else:
            print(f"{current_thread().getName()}|  Error: {fut.exception()}\n")
    
    
    if __name__ == '__main__':
        with ThreadPoolExecutor(max_workers=10) as pool:
            for i in URLS:
                _fut = pool.submit(do_some_work, i, 1, 10)
                _fut.add_done_callback(show_fut_results)
    

    If you want more control over threads, use threading module:

    from threading import Thread
    from queue import Queue
    from time import sleep
    from random import randint
    import logging
    
    # imagine these are urls
    URLS = [f"URL-{i}" for i in range(100)]
    
    # number of worker threads
    WORKER_NUM = 10
    
    
    def do_some_work(url: str, a: int, b: int) -> str:
        """Simulates some work"""
        sleep(2)
        rand_num = randint(a, b)
        if rand_num == 5:
            raise ValueError(f"No! 5 found in URL: {url}")
        r = f"{url} = {rand_num}"
        return r
    
    
    def thread_worker_func(q: Queue, a: int, b: int) -> None:
        """Target function for Worker threads"""
        logging.info("Started working")
        while True:
            try:
                url = q.get()
    
                # if poison pill - stop worker thread
                if url is None:
                    break
    
                r = do_some_work(url, a, b)
                logging.info(f"Result: {r}")
            except ValueError as ex:
                logging.error(ex)
            except Exception as ex:
                logging.error(f"Unexpected error: {ex}")
    
        logging.info("Finished working")
    
    
    if __name__ == '__main__':
        logging.basicConfig(
            level=logging.INFO,
            format="%(levelname)s | %(threadName)s | %(asctime)s | %(message)s",
        )
        in_q = Queue(50)
        workers = [
            Thread(target=thread_worker_func, args=(in_q, 1, 10, ), name=f"MyWorkerThread-{i+1}")
            for i in range(WORKER_NUM)
        ]
        [w.start() for w in workers]
    
        # start distributing tasks
        for _url in URLS:
            in_q.put(_url)
    
        # send poison pills to worker-threads
        for w in workers:
            in_q.put(None)
    
        # wait worker thread to join Main Thread
        logging.info("Main Thread waiting for Worker Threads")
        [w.join() for w in workers]
    
        logging.info("Workers joined")
        sleep(10)
        logging.info("App finished")