Search code examples
pythontkintermultiprocessingconcurrent.futures

Python: Update Tkinter while using Multiprocessing


I am having the following issue:

I am trying to use concurrent.futures.ProcessPoolExecutor(), or anything similar, and show the progress of each process on a tkinter widget.

There is this answer: Python Tkinter multiprocessing progress but I can not quite make it to work.

The following simplified version of my code seems to work only when using ThreadPoolExecutor() which I do not want.

Thanks in advance for any help!

import concurrent.futures
import tkinter
import tkinter.ttk
import multiprocessing
import random
import time


class App:
    def __init__(self, root):
        self.root = root

        self.processes = 5
        self.percentage = []
        self.changing_labels = []
        self.queues = []
        self.values = []

        for i in range(self.processes):
            temp_percentage = tkinter.StringVar()
            temp_percentage.set("0 %")
            self.percentage.append(temp_percentage)

            temp_changing_label = tkinter.Label(self.root, textvariable=temp_percentage)
            temp_changing_label.pack()
            self.changing_labels.append(temp_changing_label)

            self.queues.append(multiprocessing.Queue())
            # Just same values that I want to do calculations on
            temp_value = []
            for ii in range(12):
                temp_value.append(random.randrange(10))
            self.values.append(temp_value.copy())

        self.start_processing()

    def start_processing(self):
        def save_values(my_values):     # Save my new calculated values on the same file or different file
            with open(f"example.txt", "a") as file:
                for v in my_values:
                    file.write(str(v))
                    file.write(" ")
                file.write("\n")

        def work(my_values, my_queue):  # Here I do all my work
            # Some values to calculate my progress so that I can update my Labels
            my_progress = 0
            step = 100 / len(my_values)
            # Do some work on the values
            updated_values = []
            for v in my_values:
                time.sleep(0.5)
                updated_values.append(v + 1)

                my_progress += step
                my_queue.put(my_progress)   # Add current progress to queue

            save_values(updated_values)     # Save it before exiting

        # This Part does no work with ProcessPoolExecutor, with ThreadPoolExecutor it works fine
        with concurrent.futures.ProcessPoolExecutor() as executor:
            results = [executor.submit(work, self.values[i], self.queues[i])
                       for i in range(self.processes)]
            # Run in a loop and update Labels or exit when done
            while True:
                results_done = [result.done() for result in results]

                if False in results_done:
                    for i in range(self.processes):
                        if results_done[i] is False:
                            if not self.queues[i].empty():
                                temp_queue = self.queues[i].get()
                                self.percentage[i].set(f"{temp_queue:.2f} %")
                        else:
                            self.percentage[i].set("100 %")
                        self.root.update()
                else:
                    break
        # Close window at the very end
        self.root.destroy()


def main():  # Please do not change my main unless it is essential
    root = tkinter.Tk()
    my_app = App(root)
    root.mainloop()


if __name__ == "__main__":
    main()

Solution

  • The problem is caused by an exception being thrown inside the new processes. To see this exception, you can call the Future.result function, which raises the exception, e.g.

    print([result.result() for result in results])
    

    This gives the error AttributeError: Can't pickle local object 'App.start_processing.<locals>.work', so the problem is that work was defined inside another function.

    After moving the work and save_values functions out of the start_processing method, the error changes to RuntimeError: Queue objects should only be shared between processes through inheritance. This can be fixed by using a multiprocessing.Manager to create the queues:

    
    class App:
        def __init__(self, root):
            ...
    
            with multiprocessing.Manager() as manager:
                for i in range(self.processes):
                    ...
    
                    self.queues.append(manager.Queue())
                ...
                self.start_processing()
    

    The print([result.result() for result in results]) debugging line can now be removed. The full code is as follows:

    import concurrent.futures
    import tkinter
    import tkinter.ttk
    import multiprocessing
    import random
    import time
    
    
    class App:
        def __init__(self, root):
            self.root = root
    
            self.processes = 5
            self.percentage = []
            self.changing_labels = []
            self.queues = []
            self.values = []
    
            with multiprocessing.Manager() as manager:
                for i in range(self.processes):
                    temp_percentage = tkinter.StringVar()
                    temp_percentage.set("0 %")
                    self.percentage.append(temp_percentage)
    
                    temp_changing_label = tkinter.Label(self.root, textvariable=temp_percentage)
                    temp_changing_label.pack()
                    self.changing_labels.append(temp_changing_label)
    
                    self.queues.append(manager.Queue())
                    # Just same values that I want to do calculations on
                    temp_value = []
                    for ii in range(12):
                        temp_value.append(random.randrange(10))
                    self.values.append(temp_value.copy())
    
                self.start_processing()
    
        @staticmethod
        def save_values(my_values):  # Save my new calculated values on the same file or different file
            with open(f"example.txt", "a") as file:
                for v in my_values:
                    file.write(str(v))
                    file.write(" ")
                file.write("\n")
    
        @classmethod
        def work(cls, my_values, my_queue):  # Here I do all my work
            # Some values to calculate my progress so that I can update my Labels
            my_progress = 0
            step = 100 / len(my_values)
            # Do some work on the values
            updated_values = []
            for v in my_values:
                time.sleep(0.5)
                updated_values.append(v + 1)
    
                my_progress += step
                my_queue.put(my_progress)  # Add current progress to queue
    
            cls.save_values(updated_values)  # Save it before exiting
    
        def start_processing(self):
    
            # This Part does no work with ProcessPoolExecutor, with ThreadPoolExecutor it works fine
            with concurrent.futures.ProcessPoolExecutor() as executor:
                results = [executor.submit(self.work, self.values[i], self.queues[i])
                           for i in range(self.processes)]
                # Run in a loop and update Labels or exit when done
                while True:
                    results_done = [result.done() for result in results]
    
                    if False in results_done:
                        for i in range(self.processes):
                            if results_done[i] is False:
                                if not self.queues[i].empty():
                                    temp_queue = self.queues[i].get()
                                    self.percentage[i].set(f"{temp_queue:.2f} %")
                            else:
                                self.percentage[i].set("100 %")
                            self.root.update()
                    else:
                        break
            # Close window at the very end
            self.root.destroy()
    
    
    def main():  # Please do not change my main unless it is essential
        root = tkinter.Tk()
        my_app = App(root)
        root.mainloop()
    
    
    if __name__ == "__main__":
        main()
    

    PS: I would recommend not all(results_done) rather than False in results_done, and not results_done[i] rather than results_done[i] is False.