I'm currently trying to implement a pipeline using TFX (I've followed this notebook: TFX - Chicago Taxi) in order to serve it with Tensorflow Serving. As I'm trying to implement my own pipeline to classify texts (comming from this dataset: Kaggle - BBC News Classification)
So, for now I'm able to implement every component until the trainer. So here is, for instance, my Transform component:
import tensorflow as tf
import tensorflow_transform as tft
from utils import documents_constants
_TEXT_FEATURE_KEYS = documents_constants.TEXT_FEATURE_KEYS
_VOCAB_SIZE = documents_constants.VOCAB_SIZE
_OOV_SIZE = documents_constants.OOV_SIZE
_LABEL_KEY = documents_constants.LABEL_KEY
_transformed_name = documents_constants.transformed_name
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
outputs = {}
# Pre-process the text
for key in _TEXT_FEATURE_KEYS:
outputs[_transformed_name(key)] = inputs[key]
# Make a dictionary out of output label
outputs[_transformed_name(_LABEL_KEY)] = tft.compute_and_apply_vocabulary(
_fill_in_missing(inputs[_LABEL_KEY]),
top_k=_VOCAB_SIZE,
num_oov_buckets=_OOV_SIZE)
return outputs
def _fill_in_missing(x):
"""Replace missing values in a SparseTensor.
Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
Args:
x: A `SparseTensor` of rank 2. Its dense shape should have size at most 1
in the second dimension.
Returns:
A rank 1 tensor where missing values of `x` have been filled in.
"""
default_value = '' if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
default_value),
axis=1)
This one just aim at taking the raw 'Text' column and simply compute a vocabulary on the output category.
Where my problem is, is when I'm trying to build a model that include the
tensorflow.keras.layers.experimental.preprocessing.TextVectorization
in the layers of my model. I mean, I can include it pretty easily in a model like this:
def _build_keras_model(vectorize_layer: TextVectorization) -> tf.keras.Model:
"""Creates a DNN Keras model for classifying documents.
Args:
vectorize_layer: TextVectorization, the layer sizes of the DNN (input layer first).
Returns:
A keras Model.
"""
# The first layer in our model is the vectorization layer. After this layer,
# we have a tensor of shape (batch_size, features) containing TF-IDF features.
input_layer = tf.keras.layers.Input(name="Text_xf", shape=(), dtype=tf.string)
deep = vectorize_layer(input_layer)
deep = layers.Embedding(_max_features + 1, _embedding_dim)(deep)
deep = layers.Dropout(0.2)(deep)
deep = layers.GlobalAveragePooling1D()(deep)
deep = layers.Dropout(0.2)(deep)
output = layers.Dense(5, activation='sigmoid', name='predictions')(deep)
# Compile the model with binary crossentropy loss and an adam optimizer.
model = tf.keras.Model(input_layer, output)
model.compile(
loss=losses.SparseCategoricalCrossentropy(from_logits=True),
optimizer='adam',
metrics=['accuracy'])
return model
And this work. But when I try to fit it with my dataset I get this:
TypeError: in user code:
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:806 train_function *
return step_function(self, iterator)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:796 step_function **
outputs = model.distribute_strategy.run(run_step, args=(data,))
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1211 run
return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2585 call_for_each_replica
return self._call_for_each_replica(fn, args, kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2945 _call_for_each_replica
return fn(*args, **kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:789 run_step **
outputs = model.train_step(data)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:747 train_step
y_pred = self(x, training=True)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:985 __call__
outputs = call_fn(inputs, *args, **kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/functional.py:386 call
inputs, training=training, mask=mask)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/functional.py:508 _run_internal_graph
outputs = node.layer(*args, **kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:985 __call__
outputs = call_fn(inputs, *args, **kwargs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/layers/preprocessing/text_vectorization.py:571 call
inputs = self._preprocess(inputs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/layers/preprocessing/text_vectorization.py:527 _preprocess
lowercase_inputs = gen_string_ops.string_lower(inputs)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/ops/gen_string_ops.py:1028 string_lower
"StringLower", input=input, encoding=encoding, name=name)
/opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py:479 _apply_op_helper
repr(values), type(values).__name__, err))
TypeError: Expected string passed to parameter 'input' of op 'StringLower', got of type 'SparseTensor' instead. Error: Expected string, got of type 'SparseTensor' instead.
I'm fairly new to Tensorflow and I'm trying to understand the whole process of writing pipelines with TFX I don't get why it seems that the vectorization layer is not expecting SparseTensor and expect a string. I do know the implication of using experimental features but if anyone have an idea, or can point to me an obvious mistake I'm making, it would be great !!
I'm running out of ideas to make this work.
Note: I thought it would come from the way I retrieve the dataset:
def _input_fn(file_pattern: List[Text],
tf_transform_output: tft.TFTransformOutput,
batch_size: int = 200) -> tf.data.Dataset:
"""Generates features and label for tuning/training.
Args:
file_pattern: List of paths or patterns of input tfrecord files.
tf_transform_output: A TFTransformOutput.
batch_size: representing the number of consecutive elements of returned
dataset to combine in a single batch
Returns:
A dataset that contains (features, indices) tuple where features is a
dictionary of Tensors, and indices is a single Tensor of label indices.
"""
transformed_feature_spec = (
tf_transform_output.transformed_feature_spec().copy())
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=file_pattern,
batch_size=batch_size,
features=transformed_feature_spec,
reader=_gzip_reader_fn,
label_key=_transformed_name(_LABEL_KEY))
return dataset
I also use this like this:
def run_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
# TODO: Make better method to adapt vectorizer layer
text_feature_spec = {_transformed_name('Text'): tf.io.FixedLenFeature([], dtype=tf.string)}
text_dataset = _input_text_fn(fn_args.train_files, text_feature_spec, 978)
text_dataset = text_dataset.map(lambda d: d[_transformed_name('Text')]).take(1)
vectorize_layer = get_vectorize_layer()
vectorize_layer.adapt(text_dataset)
model = _build_keras_model(vectorize_layer)
log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=log_dir, update_freq='batch')
print(train_dataset)
model.fit(
train_dataset,
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset,
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
I've managed to make it work, but not in the cleanest way.
The reason I got this message is that the TextVectorization layer in the model will only accept a tensor (a dense one it seems), a numpy array, a list or a dataset. So I give him what he want by adapting my code like this (this is the full function updated):
def run_fn(fn_args: TrainerFnArgs):
"""Train the model based on given args.
Args:
fn_args: Holds args used to train the model as name/value pairs.
"""
tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
vectorize_dataset = train_dataset.map(lambda f, l: tf.sparse.to_dense(f[_transformed_name('Text')])).unbatch()
vectorize_layer = TextVectorization(
max_tokens=_max_features,
output_mode='int',
output_sequence_length=500
)
vectorize_layer.adapt(vectorize_dataset.take(900))
model = _build_keras_model(vectorize_layer)
log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=log_dir, update_freq='batch')
model.fit(
train_dataset.map(lambda f, l: (tf.sparse.to_dense(f[_transformed_name('Text')]), l)),
steps_per_epoch=fn_args.train_steps,
validation_data=eval_dataset.map(lambda f, l: (tf.sparse.to_dense(f[_transformed_name('Text')]), l)),
validation_steps=fn_args.eval_steps,
callbacks=[tensorboard_callback])
signatures = {
'serving_default':
_get_serve_tf_examples_fn(model,
tf_transform_output).get_concrete_function(
tf.TensorSpec(
shape=[None],
dtype=tf.string,
name='examples')),
}
model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
Notice the map functions in the parameters of the fit function. The rest stayed the same (pretty much, I just adjusted the shape in the input layer and tweeked the model to get better results).
I wonder if there is an easier way to achieve this and still keep the benefits of SparseTensor.