Search code examples
pythontensorflowparallel-processingneural-networkconcurrent.futures

Predicting in parallel using concurrent.futures of tensorflow.keras models


I am trying to implement some parallel jobs using concurrent.futures. Each worker requires a copy of a TensorFlow model and some data. I implemented it in the following way (MWE)

import tensorflow as tf
from tensorflow import keras
import numpy as np
import concurrent.futures 
import time


def simple_model():
    model = keras.models.Sequential([
        keras.layers.Dense(units = 10, input_shape = [1]),
        keras.layers.Dense(units = 1, activation = 'sigmoid')
    ])
    model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
    return model

def clone_model(model):
    model_clone = tf.keras.models.clone_model(model)
    model_clone.set_weights(model.get_weights())
    return model_clone

def work(model, seq):
    return model.predict(seq)

def worker(model, num_of_seq = 4):
    seqences = np.arange(0,100).reshape(num_of_seq, -1)
    with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:        
        t0 = time.perf_counter()
        model_list = [clone_model(model) for _ in range(num_of_seq)]
        future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}
    Seq_out = []
    for future in concurrent.futures.as_completed(future_to_samples):
        out = future.result()
        Seq_out.append(out)
    t1 = time.perf_counter()
    print(t1-t0)
    return np.reshape(Seq_out, (-1, )), t1-t0



if __name__ == '__main__':
    model = simple_model()
    out = worker(model, num_of_seq=4)
    print(out)

simple_model() creates the model. clone_model clones a TensorFlow model. work represents an MWE of possible work. worker assigns the work in parallel.

This is not working, it just stuck and does not produce any output. However, the above code works if I replace ProcessPoolExecutor with ThreadPoolExecutor. But does not provide any speedup(it could be that it's not running the workers in parallel).

From my understanding, the error lies in the argument model of future_to_samples = {executor.submit(work, model, seq): seq for model, seq in zip(model_list, seqences)}.


Solution

  • I modified the code such that it sends the path of the model than the model itself to the child processes. And it works.

    import tensorflow as tf
    from tensorflow import keras
    
    import numpy as np
    import concurrent.futures 
    import time
    
    # gpus = tf.config.experimental.list_physical_devices('GPU')
    # if len(gpus) > 0:
    #     print(f'GPUs {gpus}')
    #     try: tf.config.experimental.set_memory_growth(gpus[0], True)
    #     except RuntimeError: pass
    
    def simple_model():
        model = keras.models.Sequential([
            keras.layers.Dense(units = 10, input_shape = [1]),
            keras.layers.Dense(units = 1, activation = 'sigmoid')
        ])
        model.compile(optimizer = 'sgd', loss = 'mean_squared_error')
        return model
    
    def clone_model(model):
        model_clone = tf.keras.models.clone_model(model)
        model_clone.set_weights(model.get_weights())
        return model_clone
    
    def work(model_path, seq):
        # model = clone_model(model)# model_list[model_id]
        # print(model)
        # import tensorflow as tf
        model = tf.keras.models.load_model(model_path)
        return model.predict(seq)
    
    def worker(model, num_of_seq = 4):
        seqences = np.arange(0,num_of_seq*10).reshape(num_of_seq, -1)
        model_savepath = './simple_model.h5'
        model.save(model_savepath)
        path_list = [model_savepath for _ in range(num_of_seq)]
        with concurrent.futures.ProcessPoolExecutor(max_workers=None) as executor:        
            t0 = time.perf_counter()
            # model_list = [clone_model(model) for _ in range(num_of_seq)]
            index_list = np.arange(1, num_of_seq)# [clone_model(model) for _ in range(num_of_seq)]
            # print(model_list)
            future_to_samples = {executor.submit(work, path, seq): seq for path, seq in zip(path_list,seqences)}
        Seq_out = []
        for future in concurrent.futures.as_completed(future_to_samples):
            out = future.result()
            Seq_out.append(out)
        t1 = time.perf_counter()
        print(t1-t0)
        return np.reshape(Seq_out, (-1, )), t1-t0
    
    
    
    if __name__ == '__main__':
        model = simple_model()
        num_of_seq = 400
        # model_list = [clone_model(model) for _ in range(4)]
        out = worker(model, num_of_seq=num_of_seq)
        print(out)