Search code examples
pythonmultithreadingconcurrencythreadpoolexecutor

Python Concurrency ThreadPoolExecutor - stop execution if condition is met


I am stuck with trying to multi-thread using the ThreadPoolExecutor, whereby the execution is to be safely stopped if one of the processes meets a criteria, so that remaining threads do not have to be finalised.

I have got the following concept, but it does not work, it keeps just running:

import time, sys
from concurrent.futures import ThreadPoolExecutor

data = {"future1": 'blank', "future2": 'blank', "future3": 'blank'}

def function1(n):
   global data #not sure if this is necessary, as it seems to be able to access this anyway?
   print(n)
   time.sleep(1)
   data['future1'] = n

def function2(n):
   global data
   print(n)
   time.sleep(2)
   data['future2'] = n

def function3(n):
   global data
   print(n)
   time.sleep(3)
   data['future3'] = n

with ThreadPoolExecutor(max_workers=4) as executor:
  while True:
    future1=executor.submit(function1, 'test1')
    future2=executor.submit(function2, 'test2')
    future3=executor.submit(function3, 'test3')

    if data['future2']!='blank':
      executor.shutdown(wait=False)
      sys.exit()

Not sure what I am doing wrong here, any help will be appreciated.


Solution

  • Here is the full answer, turns out that executor.shutdown(wait=False) is not the way to go as mentioned by Sachin. Full credit to https://gist.github.com/clchiou/f2608cbe54403edb0b13

    import time, sys
    from concurrent.futures import ThreadPoolExecutor
    import concurrent.futures.thread
    
    data = {"future1": None, "future2": None, "future3": None}
    
    def function1(n):
       time.sleep(1)
       data['future1'] = n
       print(n)
    
    def function2(n):
       time.sleep(2)
       data['future2'] = n
       print(n)
    
    def function3(n):
       time.sleep(3)
       data['future3'] = n
       print(n)
    
    with ThreadPoolExecutor(max_workers=4) as executor:
      executor.submit(function1, 'test1')
      executor.submit(function2, 'test2')
      executor.submit(function3, 'test3')
    
      while True:
        if any(v is not None for v in data.values()):
          executor._threads.clear()
          concurrent.futures.thread._threads_queues.clear()
          break
    
    print(data)