I have the following code that runs two TensorFlow trainings in parallel using Dask workers implemented in Docker containers.
I need to launch two processes, using the same dask client, where each will train their respective models with N workers.
To that end, I do the following:
joblib.delayed
to spawn the two processes.with joblib.parallel_backend('dask'):
to execute the fit/training logic. Each training process triggers N dask workers.The problem is that I don't know if the entire process is thread safe, are there any concurrency elements that I'm missing?
# First, submit the function twice using joblib delay
delayed_funcs = [joblib.delayed(train)(sub_task) for sub_task in [123, 456]]
parallel_pool = joblib.Parallel(n_jobs=2)
parallel_pool(delayed_funcs)
# Second, submit each training process
def train(sub_task):
global client
if client is None:
print('connecting')
client = Client()
data = some_data_to_train
# Third, process the training itself with N workers
with joblib.parallel_backend('dask'):
X = data[columns]
y = data[label]
niceties = dict(verbose=False)
model = KerasClassifier(build_fn=build_layers,
loss=tf.keras.losses.MeanSquaredError(), **niceties)
model.fit(X, y, epochs=500, verbose = 0)
The question, as given, could easily be marked as "unclear" for SO. A couple of notes:
global client
: makes the client object available outside of the fucntion. But the function is run from another process, you do not affect the other process when making the clientif client is None
: this is a name error, your code doesn't actually run as writtenclient = Client()
: you make a new cluster in each subprocess, each assuming the total resources available, oversubscribing those resources.You must ask yourself: why are you creating processes for the two fits at all? Why not just let Dask figure out its parallelism, which is what it's meant for.
--
-EDIT-
to answer the form of the question asked in a comment.
My question is whether using the same client variable in these two parallel processes creates a problem.
No, the two client
variables are unrelated to one-another. You may see a warning message about not being able to bind to a default port, which you can safely ignore. However, please don't make it global
as this is unnecessary and makes what you are doing less clear.
--
I think I must answer the question as phrased in your comment, which I advise to add to the main question
I need to launch two processes, using the same dask client, where each will train their respective models with N workers.
You have the following options:
Client()
and get its address (e.g., client._scheduler_identity['address']
) and connect to thatclient.write_scheduler_file
and use thatYou will connect in the function with
client = Client(address)
or
client = Client(scheduler_file=the_file_you_wrote)