Search code examples
pythonmultithreadingmultiprocessing

Can I submit the task to ProcessPoolExecutor in "class instance"?


I'm seeking help with a code challenge I'm facing.

In my code, there's a class that does the following stuff:
(1) Updating self.current_time for every 1 seconds by a nested thread.
(2) Displaying the current time.

What I'm trying to achieve is creating numerous instances of this class and printing the current time for each of them using ProcessPoolExecutor.

Here's the initial code:

import time
import threading
import concurrent.futures

class time_print_class:
    def __init__(self, tid):
        self.tid = tid
        time_thread = threading.Thread(target=self.update_time)
        time_thread.start()
        self.current_time = 0

    def update_time(self):
        while True:
            self.current_time = time.time()
            time.sleep(1)

    def print_time(self):
        # Although it shows print_time, it actually performs more complex tasks in my code. Therefore, it should be executed via multiprocessing.
        print(self.current_time, "on", self.tid)


if __name__ == "__main__":
    ### Initialize
    time_print_insts = [time_print_class(n) for n in range(60)]

    ### Multiprocess
    executor = concurrent.futures.ProcessPoolExecutor()
    while True:
        for tpi in time_print_insts:
            future = executor.submit(tpi.print_time)
        time.sleep(1)

Surprisingly, this code seems to work in Python 3.9, but it has a flaw. I don't want the timing of the print_time function to be dictated by the main routine. Instead, I want the print_time task to be submitted to the executor whenever update_time updates the time within the class.

Here's my failed attempt:

import time
import threading
import concurrent.futures

class time_print_class:
    def __init__(self, tid, executor):
        self.tid = tid
        self.executor = executor
        time_thread = threading.Thread(target=self.update_time)
        time_thread.start()
        self.current_time = 0

    def update_time(self):
        while True:
            self.current_time = time.time()
            self.future = executor.submit(self.print_time)
            time.sleep(1)

    def print_time(self):
        print(self.current_time, "on", self.tid)


if __name__ == "__main__":
    ### Multiprocess
    executor = concurrent.futures.ProcessPoolExecutor()

    ### Initialize
    time_print_insts = [time_print_class(n, executor) for n in range(60)]

The error I encounter with the self.future attribute is cannot pickle '_thread.lock' object.
I suspect this arises from the interplay between threads and multiprocessing.
The reason I'm not content with my initial code is that in my actual scenario, update_time in time_print_class resembles a websocket, and print_time is a function that should follow the websocket update to do some complex stuff (like displaying time, in this case), not driven by a regular timer in the main routine.
Although I can directly run "print_time" function after 'self.current_time = time.time()', the speed running by a single core on numerous of instance would be slow in my real scenario. (update_time might happen simultaneously for all instances)
Do you have any suggestions for a better solution? I'd greatly appreciate your assistance with this. Thank you.


Solution

  • when you target a method of an object the entire object has to be serialized, so split your class into two, this way you won't need to serialize the Thread or ProcessPoolExecutor members, only put the data in the class whose methods will be executed by the worker.

    import time
    import threading
    import concurrent.futures
    
    class time_print_info_class:
        def __init__(self, current_time, tid):
            self.current_time = current_time
            self.tid = tid
        def print_time(self):
            print(self.current_time, "on", self.tid, flush=True)
    
    class time_print_class:
        def __init__(self, tid, executor):
            self.executor = executor
            time_thread = threading.Thread(target=self.update_time)
            self.time_info = time_print_info_class(time.time(), tid)
            time_thread.start()
            print("thread started")
    
        def update_time(self):
            while True:
                self.time_info.current_time = time.time()
                self.future = executor.submit(self.time_info.print_time)
                time.sleep(1)
                # self.future.result()  # you need this at some point to avoid a small memory leak.
    
    if __name__ == "__main__":
        ### Multiprocess
        executor = concurrent.futures.ProcessPoolExecutor()
    
        ### Initialize
        time_print_insts = [time_print_class(n, executor) for n in range(60)]
        time.sleep(100)