I am using SCOOP (and Python 3.6 - cannot be updated) in my work. I need all workers to perform a computation, then wait for the root node to execute a slow computation (the code in a if __name__ == '__main__':
), then perform another computation with a dataframe resulting from the root node computation.
My problem is that SCOOP initiates all workers right away and they try to run all code outside the if __name__ == '__main__':
asynchronously, even if it is below the if
block. Since there is no dataframe yet, they throw an error.
What command can force all workers to wait for the root worker to complete a computation before continuing to run the rest of the code?
I have tried experimenting with scoop.futures.map
, scoop.futures.supply
and multiprocessing.managers
without success. I have also tried using multiprocessing.Barrier(8).wait()
but it does not work.
There is a scoop.futures.wait(futures)
method but I do not know how to get the futures argument...
I have something like:
import pandas as pd
import genetic_algorithm
from scoop import futures
df = pd.read_csv('database.csv') # dataframe is to large to be passed to fitness_function for every worker. I want every worker to have a copy of it!
if __name__ == '__main__':
df = add_new_columns(df) # heavy computation which I just want to perform once (not by all workers)
df = computation_using_new_columns(df) # <--- !!! error - is executed before slow add_new_columns(df) finishes
def fitness_function(): ... # all workers use fitness_function() and an error is thrown if I put it inside the if __name__ == '__main__':
if __name__ == '__main__':
results = list(futures.map(genetic_algorithm, df))
and execute the script with python3 -m scoop script.py
which starts all workers right away...
each process has its own memory space, modifying the dataframe in the main process doesn't affect the workers, you need to pass it to the workers using some sort of initializer after it is processed, which doesn't seem to be available in the SCOOP framework, a more flexible (but slightly more complicated) tool would be python's builtin multiprocessing.Pool module.
import pandas as pd
import genetic_algorithm
from multiprocessing import Pool
def fitness_function(): ...
def initializer_func(df_from_parent):
global df
df = df_from_parent
df = computation_using_new_columns(df)
if __name__ == '__main__':
df = pd.read_csv(
'database.csv')
# read the df in the main process only as it needs to be modified
# before sending it to the workers
df = add_new_columns(df) # modify the df in the main process
# create as much workers as your cpu cores, and passes the df to them, and have each worker
# execute the computation_using_new_columns on it
with Pool(initializer=initializer_func, initargs=(df,)) as pool:
results = list(pool.imap(genetic_algorithm, df)) # now do your function
if computation_using_new_columns
needs to execute in each worker then you can keep it in the initializer, but if it only needs to execute once then you can put it after add_new_columns
inside the if __name__ == "__main__"
.