Search code examples
pythonpython-3.xconcurrent.futures

How to get the name of the analyzed completed future in concurrent.futures?


While looking for an answer to my question (a solution is available in one answer in the linked duplicate), I discovered concurrent.futures, and specifically concurrent.futures.as_completed.

The code I ended up with (for the context: I needed to catch in the main program exceptions raised in threads started from that main program) works fine, except that I do not know how how to re-attach to the name of the function being scrutinized in concurrent.futures.as_completed:

import concurrent.futures

class Checks:

    @staticmethod
    def isok():
        print("OK")

    @staticmethod
    def isko():
        raise Exception("KO")

# db will keep a map of method names in Check with the thread handler
db = {}

with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
    # get the names of the methods in Check, discarding the built-ion ones
    for check in [k for k in dir(Checks) if not k.startswith('_')]:
        db[check] = executor.submit(getattr(Checks, check))

# at that point I have a map of function_name_as_a_string -> thread handler
for future in concurrent.futures.as_completed([db[k] for k in db.keys()]):
    if future.exception():
        print(f"{str(future.exception())} was raised (I do not know where, somewhere)")
    else:
        print("success (again, I do not know where, exactly)")

# all the threads are finished at that point
print("all threads are done")

In the success/failure outputs, I know that a method raisod or not an exception. What I do not know is which one did what.

Since the iteration has to be over the thread handlers I could not find a way to "inject" the method name in the loop to use it in the print() - how can I do that?


Solution

  • A possible solution is to use zip() and create an iterator of (name, handler) tuples:

    import concurrent.futures
    
    class Checks:
    
        @staticmethod
        def isok():
            print("OK")
    
        @staticmethod
        def isko():
            raise Exception("KO")
    
    # db will keep a map of method namles in Check with the actual (executable) method
    db = {}
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        for check in [k for k in dir(Checks) if not k.startswith('_')]:
            db[check] = executor.submit(getattr(Checks, check))
    
    for name, handler in zip(db.keys(), concurrent.futures.as_completed([db[k] for k in db.keys()])):
        if handler.exception():
            print(f"{str(handler.exception())} was raised by {name}")
        else:
            print(f"success in {name}")
    
    # all the threads are finished at that point
    print("all threads are done")