I'm trying to perform model predictions in parallel using the model.predict command provided by keras in python2. I use tensorflow 1.14.0 for python2. I have 5 model (.h5) files and would like the predict command to run in parallel.This is being run in python 2.7. I'm using multiprocessing pool for mapping the model filenames with the prediction function on multiple processes as shown below,
import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
global input
from tensorflow.keras.models import load_model
model=load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
The input is an image numpy array obtained from another part of the code. But on executing this I get the following,
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
self.run()
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
self._target(*self._args, **self._kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
task = get()
File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
return recv()
return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'
I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!
UPDATE 2: Thanks for all the pointers and for a full example @sokato. I executed the exact code posted by @sokato, however i got the following error(i made the changes in my code too and get the same error shown below),
Traceback (most recent call last):
File "stackoverflow.py", line 47, in <module>
with multiprocessing.Pool() as p:
AttributeError: __exit__
UPDATE3:
Thanks for all the support.I think the issue in UPDATE2 was due to usage of python2 instead of python3. I was able to solve the error given in UPDATE2 for python2 by using with closing(multiprocessing.Pool()) as p:
instead of just with multiprocessing.Pool() as p:
in @sokato's code. Import the closing function as follows: from contextlib import closing
NEW ISSUE USING A DIFFERENT APPROACH SHOWN BELOW,
I actually have multiple inputs coming in. Instead of loading model each time for each input I want to load all the models before hand and keep it in a list. I have done this as shown below,
import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
loaded_models.append(tf.keras.models.load_model(model))
def prediction(input_tuple):
inputs,loaded_models=input_tuple
predops=[]
for model in loaded_models:
predops.append(model.predict(inputs).tolist()[0])
actops=[]
for predop in predops:
actops.append(predop.index(max(predop)))
max_freqq = max(set(actops), key = actops.count)
return max_freqq
#....some pre-processing....#
'''new_all_t is a list which contains tuples and each tuple has inputs from all_t
and the list containing loaded models which will be extracted
in the prediction function.'''
new_all_t=[]
for elem in all_t:
new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))
new_all_t is a list which contains tuples and each tuple has inputs from all_t and the list containing loaded models which will be extracted in the prediction function.However, I get the following error now,
Traceback (most recent call last):
File "trial_mult-ips.py", line 240, in <module>
predops=p.map(prediction,new_all_t)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.
What exactly does this indicate? How do I go about solving this?
UPDATE4:
I included the lines tf.compat.v1.enable_eager_execution()
and
tf.compat.v1.enable_v2_behavior()
at the very beginning. Now i get the following error,
WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
Traceback (most recent call last):
File "the_other_end-mp.py", line 216, in <module>
predops=p.map(prediction,modelon)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
raise self._value
ValueError: Resource handles are not convertible to numpy.
I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!
So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.
You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.
I generated models from a tutorial since your models are not included.
You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling pool.close()
or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.
import numpy as np
import multiprocessing, time, ctypes, os
import tensorflow as tf
mis = (28, 28) #model input shape
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
def createModels(models):
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=mis),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10)
])
model.compile(optimizer='adam',
loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy'])
model.fit(x_train, y_train, epochs=5)
for mod in models:
model.save(mod)
def prediction(model_name):
model=tf.keras.models.load_model(model_name)
ret_val=model.predict(input).tolist()[0]
return ret_val
if __name__ == "__main__":
models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
dir = os.listdir(".")
if models[0] not in dir:
createModels(models)
# Shared array input
ub = 100
testShape = x_train[:ub].shape
input_base = multiprocessing.Array(ctypes.c_double,
int(np.prod(testShape)),lock=False)
input = np.ctypeslib.as_array(input_base)
input = input.reshape(testShape)
input[:ub] = x_train[:ub]
# with multiprocessing.Pool() as p: #Use me for python 3
p = multiprocessing.Pool() #Use me for python 2.7
start_time=time.time()
res=p.map(prediction,models)
p.close() #Use me for python 2.7
print('Total time taken: {}'.format(time.time() - start_time))
print(res)
I hope this helps.