Search code examples
pythonpython-3.xmultithreadingmultiprocessing

Getting returned values in multiprocessing does not work


I want to control the execution of a function by checking its runtime and memory consumption. I have the following code:

import os
import time
import psutil
from multiprocessing import Process, SimpleQueue


def my_fun(a, b, c, q):
    l = []
    i = 2
    for _ in range(a * b * c):
        l.append(i ** 50)
        i *= 1024

    q.put((a, b, c))


def check(pid, available_mem, start_time, allowed_time, q):
    while True:
        try:
            if not psutil.pid_exists(pid):
                print('Function executed successfully')
                return True

            if psutil.virtual_memory()[1] < available_mem * (10 ** 9):
                os.kill(pid, 9)
                q.put(False)
                print('Memory usage violation')
                break

            if time.time() - start_time > allowed_time:
                os.kill(pid, 9)
                q.put(False)
                print('Time out')
                break

            time.sleep(1)

        except Exception as e:
            q.put(False)
            print(e)
            break


if __name__ == "__main__":
    minimum_available_memory = 5  # in GigaByte
    time_allowed = 5  # in seconds

    my_fun_q = SimpleQueue()
    check_q = SimpleQueue()
    arg1, arg2, arg3 = 1, 1, 1

    my_fun_process = Process(target=my_fun, args=(arg1, arg2, arg3, my_fun_q))
    start_time = time.time()
    my_fun_process.start()
    pid = my_fun_process.pid

    check_process = Process(target=check, args=(pid, minimum_available_memory, start_time, time_allowed, check_q))
    check_process.start()
    my_fun_process.join()
    check_process.join()

    my_fun_result = my_fun_q.get()
    check_result = check_q.get()

    print(check_result)
    if check_result:
        print(my_fun_result)


In my code, my_fun() is the function that may have high runtime or memory consumption based on the given arguments.

If the my_fun function is executed successfully, check returns True and my_fun returns a tuple of three values. But if either the runtime is higher than 5 seconds or the available memory is less than 2 GB, the corresponding process of my_fun is killed by the process of check and only the process of check will return False. This is what I expect my code to do. However,

  • If arg1, arg2, arg3 = 1, 1, 1 are passed to my_fun, the program prints "Function executed successfully" but gets stuck in returning the values without termination.
  • If arg1, arg2, arg3 = 100000, 100000, 100000 are passed to my_fun, the program prints "Time out" but again it gets stuck without returning anything or getting terminated.

This shows that the check function is doing its job correctly but there is something wrong with returning the values. How do I fix my code?


Solution

  • If you add

        print(my_fun_q.empty(), check_q.empty())
    

    before your get() statements, you can see what's happening.

    If you call queue.get() and there is nothing in the queue, it's going to hang until someone adds something to the queue. In your case, that's forever.

    Your best bet would be to rewrite check_process() so that it always puts something into the queue. True if the function succeeds and False if it doesnt. Your code can then read from check_q, and depending on what it reads, optionally read from my_fun_q.