This question is a continuing from this but using tensorflow datasets.
So , if we use:
import tensorflow as tf
import numpy as np
from multiprocessing import Pool
from keras.datasets import fashion_mnist
from tensorflow.keras.models import Sequential
# importing various types of hidden layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D,\
Dense, Flatten
# Adam optimizer for better LR and less loss
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np
# gpu setup
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
def model_arch():
models = Sequential()
# We are learning 64
# filters with a kernal size of 5x5
models.add(Conv2D(64, (5, 5),
padding="same",
activation="relu",
input_shape=(28, 28, 1)))
# Max pooling will reduce the
# size with a kernal size of 2x2
models.add(MaxPooling2D(pool_size=(2, 2)))
models.add(Conv2D(128, (5, 5), padding="same",
activation="relu"))
models.add(MaxPooling2D(pool_size=(2, 2)))
models.add(Conv2D(256, (5, 5), padding="same",
activation="relu"))
models.add(MaxPooling2D(pool_size=(2, 2)))
# Once the convolutional and pooling
# operations are done the layer
# is flattened and fully connected layers
# are added
models.add(Flatten())
models.add(Dense(256, activation="relu"))
# Finally as there are total 10
# classes to be added a FCC layer of
# 10 is created with a softmax activation
# function
models.add(Dense(10, activation="softmax"))
return models
def _apply_df(data):
model = model_arch()
model.load_weights("/home/ggous/model_mnist.h5")
return model.predict(data)
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
result = pool.map(_apply_df, np.array_split(data, workers))
pool.close()
return list(result)
def resize_and_rescale(data):
data = tf.cast(data, tf.float32)
data /= 255.0
return data
def prepare(ds):
ds = ds.map(resize_and_rescale)
return ds.batch(1)
def after_prepare(data):
tens_data = tf.data.Dataset.from_tensor_slices(data)
tens_data = prepare(tens_data)
return tens_data
def main():
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()
test_images = after_prepare(test_images)
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images.shape) # (10000, 28, 28)
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3333, 10), (3333, 10)]
if __name__ == "__main__":
main()
we get an error:
axis1: axis 0 is out of bounds for array of dimension 0
I have just added:
def resize_and_rescale(data):
data = tf.cast(data, tf.float32)
data /= 255.0
return data
def prepare(ds):
ds = ds.map(resize_and_rescale)
return ds.batch(1)
def after_prepare(data):
tens_data = tf.data.Dataset.from_tensor_slices(data)
tens_data = prepare(tens_data)
return tens_data
so, I created tensorflow datasets in after_prepare
.
The saved model can be found here
-- UPDATE --
Now, it gives me messages:
F tensorflow/stream_executor/cuda/cuda_driver.cc:146] Failed setting context: CUDA_ERROR_NOT_INITIALIZED: initialization error
I saw this , so I tried:
multiprocessing.set_start_method('spawn', force=True)
at the beginning of the code and now gives me many messages:
Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.984897: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.984909: W tensorflow/stream_executor/gpu/asm_compiler.cc:80] Couldn't get ptxas version string: INTERNAL: Couldn't invoke ptxas --version
2022-11-08 09:12:35.985087: I tensorflow/core/platform/default/subprocess.cc:304] Start cannot spawn child process: No such file or directory
2022-11-08 09:12:35.985118: W tensorflow/stream_executor/gpu/redzone_allocator.cc:314] INTERNAL: Failed to launch ptxas
...
failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618099: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618274: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 230.40M (241592064 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618437: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 207.36M (217433088 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.618447: W tensorflow/core/common_runtime/bfc_allocator.cc:360] Garbage collection: deallocate free memory regions (i.e., allocations) so that we can re-allocate a larger region to avoid OOM due to memory fragmentation. If you see this message frequently, you are running near the threshold of the available device memory and re-allocation may incur great performance overhead. You may try smaller batch sizes to observe the performance impact. Set TF_ENABLE_GPU_GARBAGE_COLLECTION=false if you'd like to disable this feature.
2022-11-08 09:12:36.629520: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.629542: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 203.00MiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-11-08 09:12:36.629618: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.629987: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:36.630001: W tensorflow/core/common_runtime/bfc_allocator.cc:290] Allocator (GPU_0_bfc) ran out of memory trying to allocate 203.00MiB with freed_by_count=0. The caller indicates that this is not a failure, but this may mean that there could be performance gains if more memory were available.
2022-11-08 09:12:36.630110: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 230.40M (241592064 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
....
failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256468: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256640: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256810: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.256988: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.257166: I tensorflow/stream_executor/cuda/cuda_driver.cc:733] failed to allocate 256.00M (268435456 bytes) from device: CUDA_ERROR_OUT_OF_MEMORY: out of memory
2022-11-08 09:12:37.257224: W tensorflow/core/framework/op_kernel.cc:1780] OP_REQUIRES failed at conv_ops_fused_impl.h:601 : NOT_FOUND: No algorithm worked! Error messages:
Profiling failure on CUDNN engine 1#TC: RESOURCE_EXHAUSTED: Out of memory while trying to allocate 16777216 bytes.
Profiling failure on CUDNN engine 1: RESOURCE_EXHAUSTED: Out of memory while trying to allocate 16777216 bytes.
The problem comes from the data preparation step. The initial code takes the data of the shape of (10000, 28, 28)
, and using np.array_split
breaks it into a list of numpy arrays of the size of workers (here a list of 3 numpy arrays since workers=3
) to be processed by each worker.
Your input after returning from the after_prepare
function is a list of 1000 tensors because you are using batch(1)
, and this data produces the error when it reaches the np.array_split
call.
You have two options to solve this problem:
Option 1. Don't batch your data in the prepare
function and only return ds
. Then in the apply_by_multiprocessing
function change
result = pool.map(_apply_df, np.array_split(data, workers))
to
result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
Option 2. Again don't batch your data in the prepare
function and only return ds
. Then in the apply_by_multiprocessing
function change
result = pool.map(_apply_df, np.array_split(data, workers))
to
result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
Note that this produces a slightly different output shape due to how the batch size is calculated.
A working code example using Option 2 is below:
import os
import tensorflow as tf
import numpy as np
import multiprocessing
from multiprocessing import Pool
from itertools import chain
from keras.datasets import fashion_mnist
from tensorflow.keras.models import Sequential
# importing various types of hidden layers
from tensorflow.keras.layers import Conv2D, MaxPooling2D,\
Dense, Flatten
# Adam optimizer for better LR and less loss
from tensorflow.keras.optimizers import Adam
import matplotlib.pyplot as plt
import numpy as np
# gpu setup
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
def model_arch():
models = Sequential()
# We are learning 64
# filters with a kernal size of 5x5
models.add(Conv2D(64, (5, 5),
padding="same",
activation="relu",
input_shape=(28, 28, 1)))
# Max pooling will reduce the
# size with a kernal size of 2x2
models.add(MaxPooling2D(pool_size=(2, 2)))
models.add(Conv2D(128, (5, 5), padding="same",
activation="relu"))
models.add(MaxPooling2D(pool_size=(2, 2)))
models.add(Conv2D(256, (5, 5), padding="same",
activation="relu"))
models.add(MaxPooling2D(pool_size=(2, 2)))
# Once the convolutional and pooling
# operations are done the layer
# is flattened and fully connected layers
# are added
models.add(Flatten())
models.add(Dense(256, activation="relu"))
# Finally as there are total 10
# classes to be added a FCC layer of
# 10 is created with a softmax activation
# function
models.add(Dense(10, activation="softmax"))
return models
def _apply_df(data):
model = model_arch()
model.load_weights("model_mnist.h5")
return model.predict(data)
def apply_by_multiprocessing(data, workers):
pool = Pool(processes=workers)
# result = pool.map(_apply_df, np.array_split(list(data.as_numpy_iterator()), workers))
result = pool.map(_apply_df, data.batch(np.ceil(len(data) / workers)))
pool.close()
return list(result)
def resize_and_rescale(data):
data = tf.cast(data, tf.float32)
data /= 255.0
return data
def prepare(ds):
ds = ds.map(resize_and_rescale)
return ds
def after_prepare(data):
tens_data = tf.data.Dataset.from_tensor_slices(data)
tens_data = prepare(tens_data)
return tens_data
def main():
multiprocessing.set_start_method('spawn')
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
fashion_mnist = tf.keras.datasets.fashion_mnist
_, (test_images, test_labels) = fashion_mnist.load_data()
test_images = after_prepare(test_images)
results = apply_by_multiprocessing(test_images, workers=3)
print(test_images) # <MapDataset with shape=(28, 28)>
print(len(results)) # 3
print([x.shape for x in results]) # [(3334, 10), (3334, 10), (3332, 10)]
results_flatten = list(chain.from_iterable(results))
print(len(results_flatten), results_flatten[0].shape) # 10000 (10,)
if __name__ == "__main__":
main()