Search code examples
pythonthreadpoolwaitthreadpoolexecutorconcurrent.futures

with concurrent.futures.ThreadPoolExecutor() as executor: ... does not wait


I am trying to use the ThreadPoolExecutor() in a method of a class to create a pool of threads that will execute another method within the same class. I have the with concurrent.futures.ThreadPoolExecutor()... however it does not wait, and an error is thrown saying there was no key in the dictionary I query after the "with..." statement. I understand why the error is thrown because the dictionary has not been updated yet because the threads in the pool did not finish executing. I know the threads did not finish executing because I have a print("done") in the method that is called within the ThreadPoolExecutor, and "done" is not printed to the console.

I am new to threads, so if any suggestions on how to do this better are appreciated!

    def tokenizer(self):
        all_tokens = []
        self.token_q = Queue()
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            for num in range(5):
                executor.submit(self.get_tokens, num)
            executor.shutdown(wait=True)

        print("Hi")
        results = {}
        while not self.token_q.empty():
            temp_result = self.token_q.get()
            results[temp_result[1]] = temp_result[0]
            print(temp_result[1])
        for index in range(len(self.zettels)):
            for zettel in results[index]:
                all_tokens.append(zettel)
        return all_tokens

    def get_tokens(self, thread_index):
        print("!!!!!!!")
        switch = {
            0: self.zettels[:(len(self.zettels)/5)],
            1: self.zettels[(len(self.zettels)/5): (len(self.zettels)/5)*2],
            2: self.zettels[(len(self.zettels)/5)*2: (len(self.zettels)/5)*3],
            3: self.zettels[(len(self.zettels)/5)*3: (len(self.zettels)/5)*4],
            4: self.zettels[(len(self.zettels)/5)*4: (len(self.zettels)/5)*5],
        }
        new_tokens = []
        for zettel in switch.get(thread_index):
            tokens = re.split('\W+', str(zettel))
            tokens = list(filter(None, tokens))
            new_tokens.append(tokens)
        print("done")
        self.token_q.put([new_tokens, thread_index])

'''

Expected to see all print("!!!!!!") and print("done") statements before the print ("Hi") statement. Actually shows the !!!!!!! then the Hi, then the KeyError for the results dictionary.


Solution

  • You need to loop over concurrent.futures.as_completed() as shown here. It will yield values as each thread completes.