Search code examples
pythonmultithreadingmultiprocessingreturnpython-multithreading

how to get a value returned by a thread/subprocess in this loop which validates if they will continue to be active or if they should be deactivated?


#import threading
import multiprocessing
import time

condition = True
finish_state = False
x = 0
all_processes = []

def operation_process(x):
    #many operations with x value that if they take a long time will be closed because the other thread will set the boolean to True and close all open threads
    time.sleep(20) #for example these operations with x take 20 seconds in this case

def finish_process(finish_state):
    finish_state = True
    time.sleep(5) #time to send the boolean that terminates all started threads
    return finish_state

def main_process(condition, finish_state, x):

    while condition == True:
        
        #Create a new thread and fork the code()
        #THIS IS THE LINE WHERE I AM SUPPOSED TO BE EXPECTING TO RECEIVE THE RETURN VALUE, AT THE CYCLE NUMBER IN WHICH THE FUNCTION finish_state() TIME HAS FINISHED...
        #thr1 = threading.Thread(target=finish_process, args=(finish_state, ))
        thr1 = multiprocessing.Thread(target=finish_process, args=(finish_state, ))

        #thr2 = threading.Thread(target=operation_process, args=(x, ))
        thr2 = multiprocessing.Thread(target=operation_process, args=(x, ))

        x = x + 1
        print("cycle number: " + str(x))
        
        #Only in the first cycle
        if(x == 1):
            thr1.start()
            thr2.start()
            all_processes.append(thr1)
            all_processes.append(thr2)

            #print(threading.activeCount()) #to list open threads
            print(multiprocessing.active_children()) #to list open subprocesses

        if(finish_state == True):
            print("loop ends!")
            for process in all_processes:
                process.terminate()
            condition = False

main_process(condition, finish_state, x)

print("continue the code with other things after all threads/processes except the main one were closed with the loop that started them... ")

How do I receive the value finish_state = True returned by the function finish_process() that is inside the thread?

The objective is that the main thread calls the function main_process() which contains a loop that simulates a do-while loop, and within that loop a thread is created only during the first cycle of that loop (that is, when the variable x = 1 ), and the idea is that the do-while loop repeats until the thread returns the value finish_state = True and the main thread is closed, because the variable finish_state is not False.

But the problem is that I need `` to run only on the first loop because otherwise a new thread will be created on each iteration, but at the same time I would have to be waiting for that thread to return me at some point the finish_state = True

I decided to change threads to subprocesses because it's easier to kill them because even though threads share variables I didn't find a way to kill them selectively so use terminate() method to kill processes. I'm not really sure if it's the best way to do it, since anyway I need to somehow make the signal that is the variable finish_state get to the process or main thread, so that it decides if all the threads should be closed, processes or threads, minus itself.


Edit: When I try with the code of Frank Yellin, this give me that error in console:

Traceback (most recent call last):
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "<string>", line 1, in <module>
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 125, in _main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 125, in _main
    prepare(preparation_data)
    prepare(preparation_data)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 236, in prepare
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 236, in prepare
    _fixup_main_from_path(data['init_main_from_path'])
    _fixup_main_from_path(data['init_main_from_path'])
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 287, in _fixup_main_from_path
    main_content = runpy.run_path(main_path,
    main_content = runpy.run_path(main_path,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 265, in run_path
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 265, in run_path
    return _run_module_code(code, init_globals, run_name,
    return _run_module_code(code, init_globals, run_name,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 97, in _run_module_code
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 97, in _run_module_code
    _run_code(code, mod_globals, init_globals,
    _run_code(code, mod_globals, init_globals,
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 87, in _run_code
  File "C:\Users\Maty0\Anaconda3\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
    exec(code, run_globals)
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 36, in <module>
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 36, in <module>
    main_process(finish_state)
    main_process(finish_state)
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 26, in main_process
  File "C:\Users\Maty0\Desktop\P\Python\Threads\thread_way.py", line 26, in main_process
    thr1.start()
    thr1.start()
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\process.py", line 121, in start
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
    self._popen = self._Popen(self)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
    return Popen(process_obj)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
    _check_not_importing_main()
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
  File "C:\Users\Maty0\Anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

Solution

  • I think you're going about this completely wrong. I'm also not quite sure why you have global variables that then get passed as arguments to functions, instead of just having local variables.

    In any case you're looking for something like this. You need to make finish_state into an event, which can be set by one thread and observed by other threads.

    finish_state = multiprocessing.Event()
    
    def operation_process(x):
        # many operations with x value that if they take a long time will be closed because the other thread will set the boolean to True and close all open threads
        time.sleep(20)  # for example these operations with x take 20 seconds in this case
    
    def finish_process(finish_state):
        time.sleep(5)
        finish_state.set()
    
    
    def main_process(finish_state):
        # Create a new thread and fork the code()
        # THIS IS THE LINE WHERE I AM SUPPOSED TO BE EXPECTING TO RECEIVE THE RETURN VALUE, AT THE CYCLE NUMBER IN WHICH THE FUNCTION finish_state() TIME HAS FINISHED...
        # thr1 = threading.Thread(target=finish_process, args=(finish_state, ))
        thr1 = multiprocessing.Thread(target=finish_process, args=(finish_state,))
        thr1.start()
    
        thr2 = multiprocessing.Thread(target=operation_process, args=(0,))
        thr2.start()
    
        finish_state.wait()
    
        for process in [thr1, thr2]:
            process.terminate()
    
    if __name__ == '__main__':
        main_process(finish_state)
    

    If you want a status report every second until finished:

        while not finish_state.wait(timeout=1):
            # .wait() returns False if this times out:
            ... print status report ....