Search code examples
pythonnumpytensorflowmultiprocessingeager-execution

Parallelizing model predictions in keras using multiprocessing for python


I'm trying to perform model predictions in parallel using the model.predict command provided by keras in python2. I use tensorflow 1.14.0 for python2. I have 5 model (.h5) files and would like the predict command to run in parallel.This is being run in python 2.7. I'm using multiprocessing pool for mapping the model filenames with the prediction function on multiple processes as shown below,

import matplotlib as plt
import numpy as np
import cv2
from multiprocessing import Pool
pool=Pool()
def prediction(model_name):
    global input
    from tensorflow.keras.models import load_model
    model=load_model(model_name)
    ret_val=model.predict(input).tolist()[0]
    return ret_val

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
start_time=time.time()
res=pool.map(prediction,models)
print('Total time taken: {}'.format(time.time() - start_time))
print(res)

The input is an image numpy array obtained from another part of the code. But on executing this I get the following,

Traceback (most recent call last):
Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
  File "/usr/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
    self.run()
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    self._target(*self._args, **self._kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    task = get()
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get
    return recv()
    return recv()
AttributeError: 'module' object has no attribute 'prediction'
AttributeError: 'module' object has no attribute 'prediction'

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!

UPDATE 2: Thanks for all the pointers and for a full example @sokato. I executed the exact code posted by @sokato, however i got the following error(i made the changes in my code too and get the same error shown below),

Traceback (most recent call last):
  File "stackoverflow.py", line 47, in <module>
    with multiprocessing.Pool() as p:
AttributeError: __exit__

UPDATE3: Thanks for all the support.I think the issue in UPDATE2 was due to usage of python2 instead of python3. I was able to solve the error given in UPDATE2 for python2 by using with closing(multiprocessing.Pool()) as p: instead of just with multiprocessing.Pool() as p: in @sokato's code. Import the closing function as follows: from contextlib import closing

NEW ISSUE USING A DIFFERENT APPROACH SHOWN BELOW,

I actually have multiple inputs coming in. Instead of loading model each time for each input I want to load all the models before hand and keep it in a list. I have done this as shown below,

import matplotlib as plt
import numpy as np
import cv2
import multiprocessing
import tensorflow as tf
from contextlib import closing
import time

models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
loaded_models=[]
for model in models:
    loaded_models.append(tf.keras.models.load_model(model))

def prediction(input_tuple):
    inputs,loaded_models=input_tuple
    predops=[]
    for model in loaded_models:
        predops.append(model.predict(inputs).tolist()[0])
    actops=[]
    for predop in predops:
        actops.append(predop.index(max(predop)))
    max_freqq = max(set(actops), key = actops.count) 
    return max_freqq

#....some pre-processing....#

    '''new_all_t is a list which contains tuples and each tuple has inputs from all_t 
    and the list containing loaded models which will be extracted
 in the prediction function.'''

new_all_t=[]
for elem in all_t:
    new_all_t.append((elem,loaded_models))
start_time=time.time()
with closing(multiprocessing.Pool()) as p:
    predops=p.map(prediction,new_all_t)
print('Total time taken: {}'.format(time.time() - start_time))

new_all_t is a list which contains tuples and each tuple has inputs from all_t and the list containing loaded models which will be extracted in the prediction function.However, I get the following error now,

Traceback (most recent call last):
  File "trial_mult-ips.py", line 240, in <module>
    predops=p.map(prediction,new_all_t)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
NotImplementedError: numpy() is only available when eager execution is enabled.

What exactly does this indicate? How do I go about solving this?

UPDATE4: I included the lines tf.compat.v1.enable_eager_execution() and tf.compat.v1.enable_v2_behavior() at the very beginning. Now i get the following error,

WARNING:tensorflow:From /home/nick/.local/lib/python2.7/site-packages/tensorflow/python/ops/math_grad.py:1250: where (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

Traceback (most recent call last):
  File "the_other_end-mp.py", line 216, in <module>
    predops=p.map(prediction,modelon)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
ValueError: Resource handles are not convertible to numpy.

I'm not able to interpret this error message and how do I go about solving this? Any advice is much appreciated!


Solution

  • So, I am unsure of some of your design choices but I gave it the best attempt with the given information. Specifically, I think there are maybe some issues with the global variable and the import statement within your parallel function.

    1. You should use shared variables and not global variables to share an input between processes. You can read more about shared memory if you want in the multiprocessing documentation.

    2. I generated models from a tutorial since your models are not included.

    3. You are not joining or closing your pool but with the following code I was able to get the code to execute in parallel successfully. You can close the pool by calling pool.close() or with the "with" syntax shown in below. Note, the with syntax doesn't apply to python 2.7.

    import numpy as np
    import multiprocessing, time, ctypes, os
    import tensorflow as tf
    
    mis = (28, 28) #model input shape
    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0
    
    def createModels(models):
        model = tf.keras.models.Sequential([
            tf.keras.layers.Flatten(input_shape=mis),
            tf.keras.layers.Dense(128, activation='relu'),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10)
        ])
    
        model.compile(optimizer='adam',
                   loss=tf.losses.SparseCategoricalCrossentropy(from_logits=True),
                   metrics=['accuracy'])
    
        model.fit(x_train, y_train, epochs=5)
    
        for mod in models:
            model.save(mod)
    
    def prediction(model_name):
    
        model=tf.keras.models.load_model(model_name)
        ret_val=model.predict(input).tolist()[0]
        return ret_val
    
    if __name__ == "__main__":
        models=['model1.h5','model2.h5','model3.h5','model4.h5','model5.h5']
        dir = os.listdir(".")
        if models[0] not in dir:
            createModels(models)
        # Shared array input
        ub = 100
        testShape = x_train[:ub].shape
        input_base = multiprocessing.Array(ctypes.c_double, 
        int(np.prod(testShape)),lock=False)
        input = np.ctypeslib.as_array(input_base)
        input = input.reshape(testShape)
        input[:ub] = x_train[:ub]
    
        # with multiprocessing.Pool() as p:  #Use me for python 3
        p = multiprocessing.Pool() #Use me for python 2.7
        start_time=time.time()
        res=p.map(prediction,models)
        p.close() #Use me for python 2.7
        print('Total time taken: {}'.format(time.time() - start_time))
        print(res)
    

    I hope this helps.