I'm trying to run many python threads (or separate processes) from the 'run' method of class GetDayMin. I want the threads or processes to run simultaneously, and after 40 seconds the class instance writes it's data (from each thread/proc) and exits. While I can start each thread without waiting on anything to complete, if I use the join method to wait on any thread, it could take a long time to timeout since successive threads may all be blocked. It seems the join method of both threading and multiprocessing will hang until the timeout.
For example, if in my class I start 5 threads and then wait 40 seconds in order of thread creation, the first thread could take 40 seconds to timeout, and then we go to the second thread with takes 40 seconds to timeout etc. We could end up waiting 200 seconds for 5 threads.
What I want is that no thread takes longer than 40 seconds, so the whole class instance lasts a maximum of 40 seconds too. I'm willing to do multiprocessing instead of multithreading if that makes things easier. What I really anticipate is that most threads will complete within 10 seconds but three or four may hang, and I don't want to wait for them. How can I accomplish this?
import multiprocessing
import pandas as pd
import random
import time
class GetDayMin:
def __init__(self):
self.results = pd.DataFrame() # Shared DataFrame to store results
def add_result(self, result):
self.results = self.results.append(result, ignore_index=True)
def process_function(self):
sleep_time = random.randint(30, 50) # Random sleep time between 30 to 50 seconds
time.sleep(sleep_time) # Pretend I'm calculating something
# Return the time slept to store in the results to simulate thread communication
return {'process_id': multiprocessing.current_process().pid, 'time_slept': sleep_time}
def run(self):
processes = []
for _ in range(30):
process = multiprocessing.Process(target=self.process_function)
processes.append(process)
# Start all processes
for process in processes:
process.start()
# Wait for all processes to finish or timeout after 40 seconds (each--unfortunately)
for process in processes:
process.join(timeout=40)
if process.is_alive():
process.terminate()
process.join() # wait on process--want this to be a collective 40 seconds
It seems to me that you can compute an absolute time expiration_time
when all submitted tasks should have completed and if any are still running after that time they should be terminated. The amount of time you specify on process.join()
can be computed from that expiration_time
and the current time:
...
def run(self):
processes = []
for _ in range(30):
process = multiprocessing.Process(target=self.process_function)
processes.append(process)
# Start all processes
for process in processes:
process.start()
# Wait for all processes to finish or timeout after 40 seconds
expiration_time = time.time() + 40
time_expired = False
for process in processes:
if time_expired:
process.terminate()
else:
wait_time = expiration_time - time.time()
if wait_time <= 0:
process.terminate()
time_expired = True
else:
process.join(timeout=wait_time)
if process.is_alive():
process.terminate()
time_expired = True
But Consider This
You can see my answer above, which only addresses your timing issue, but I also wanted to mention that if your actual process_function
calls method add_result
the results will not be what you expected because each process will see its own copy of the dataframe
. Also, you have method process_function
returning a value. But you cannot return a value back to the caller form a process instance that way. You might want the following:
import multiprocessing
import pandas as pd
import random
import time
class GetDayMin:
def __init__(self):
self.results = pd.DataFrame() # Shared DataFrame to store results
def add_result(self, result):
self.results = self.results.append(result, ignore_index=True)
def process_function(self, results_dict):
sleep_time = random.randint(30, 50) # Random sleep time between 30 to 50 seconds
time.sleep(sleep_time) # Pretend I'm calculating something
# Return the time slept to store in the results to simulate thread communication
results_dict[multiprocessing.current_process().pid] = sleep_time
def run(self):
with multiprocessing.Manager() as manager:
results_dict = manager.dict()
processes = [
multiprocessing.Process(target=self.process_function, args=(results_dict,))
for _ in range(30)
]
# Start all processes
for process in processes:
process.start()
# Wait for all processes to finish or timeout after 40 seconds
start_time = time.time()
expiration_time = start_time + 40
time_expired = False
for process in processes:
if time_expired:
process.terminate()
else:
wait_time = expiration_time - time.time()
if wait_time <= 0:
process.terminate()
time_expired = True
else:
process.join(timeout=wait_time)
if process.is_alive():
process.terminate()
time_expired = True
end_time = time.time()
print(f'Total elapsed time: {end_time - start_time} seconds')
for pid, sleep_time in results_dict.items():
print(f'Process {pid} slept for {sleep_time} seconds')
if __name__ == '__main__':
GetDayMin().run()
Prints:
Total elapsed time: 40.00496006011963 seconds
Process 500 slept for 30 seconds
Process 504 slept for 30 seconds
Process 466 slept for 32 seconds
Process 512 slept for 33 seconds
Process 461 slept for 34 seconds
Process 470 slept for 35 seconds
Process 455 slept for 36 seconds
Process 459 slept for 36 seconds
Process 491 slept for 36 seconds
Process 472 slept for 37 seconds
Process 507 slept for 37 seconds
Process 486 slept for 39 seconds
Process 463 slept for 40 seconds
Process 477 slept for 40 seconds
The same can be accomplished with a multiprocessing pool. In this case given your worker function process_function
if we have a pool of size N and we submit N tasks to the pool, each pool process will process a single task. The advantage of using a pool is that the worker function is able to directly return a result and we therefore don't need to use a managed dictionary.
import multiprocessing
import pandas as pd
import random
import time
class GetDayMin:
def process_function(self):
sleep_time = random.randint(30, 50) # Random sleep time between 30 to 50 seconds
time.sleep(sleep_time) # Pretend I'm calculating something
# Return the time slept to store in the results to simulate thread communication
return multiprocessing.current_process().pid, sleep_time
def run(self):
results = {}
n_tasks = 30
with multiprocessing.Pool(n_tasks) as pool:
async_results = [
pool.apply_async(self.process_function)
for _ in range(n_tasks)
]
# Wait for all processes to finish or timeout after 40 seconds
start_time = time.time()
expiration_time = start_time + 40
for async_result in async_results:
wait_time = expiration_time - time.time()
if wait_time <= 0:
# Time has expired and we cannot wait for the task to complete.
# So has the task already completed:
if async_result.ready():
# The task has completed and so
# this should not block:
pid, sleep_time = async_result.get()
results[pid] = sleep_time
else:
# The time has not expired so we are willing to wait
# a certain amount of time for the result:
try:
pid, sleep_time = async_result.get(wait_time)
except multiprocessing.TimeoutError:
# We will not get a result from this task
pass
else:
results[pid] = sleep_time
# When we exit the above block all remaining tasks will be cancelled
end_time = time.time()
print(f'Total elapsed time: {end_time - start_time} seconds')
for pid, sleep_time in results.items():
print(f'Process {pid} slept for {sleep_time} seconds')
if __name__ == '__main__':
GetDayMin().run()
Prints:
Total elapsed time: 40.01216197013855 seconds
Process 832 slept for 32 seconds
Process 833 slept for 35 seconds
Process 834 slept for 35 seconds
Process 837 slept for 31 seconds
Process 844 slept for 38 seconds
Process 846 slept for 34 seconds
Process 849 slept for 31 seconds
Process 851 slept for 36 seconds
Process 852 slept for 35 seconds
Process 853 slept for 30 seconds
Process 854 slept for 39 seconds
Process 856 slept for 31 seconds
Process 857 slept for 37 seconds
Process 859 slept for 38 seconds
Process 860 slept for 36 seconds