Search code examples
tensorflowkerasdeep-learningtfx

Tensorflow TextVectorization layer in model with TFX


I'm currently trying to implement a pipeline using TFX (I've followed this notebook: TFX - Chicago Taxi) in order to serve it with Tensorflow Serving. As I'm trying to implement my own pipeline to classify texts (comming from this dataset: Kaggle - BBC News Classification)

So, for now I'm able to implement every component until the trainer. So here is, for instance, my Transform component:

import tensorflow as tf
import tensorflow_transform as tft

from utils import documents_constants

_TEXT_FEATURE_KEYS = documents_constants.TEXT_FEATURE_KEYS
_VOCAB_SIZE = documents_constants.VOCAB_SIZE
_OOV_SIZE = documents_constants.OOV_SIZE
_LABEL_KEY = documents_constants.LABEL_KEY
_transformed_name = documents_constants.transformed_name


def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.
  Args:
    inputs: map from feature keys to raw not-yet-transformed features.
  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}
  
  # Pre-process the text
  for key in _TEXT_FEATURE_KEYS:
    outputs[_transformed_name(key)] = inputs[key]
    
  # Make a dictionary out of output label
  outputs[_transformed_name(_LABEL_KEY)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[_LABEL_KEY]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  return outputs

def _fill_in_missing(x):
  """Replace missing values in a SparseTensor.
  Fills in missing values of `x` with '' or 0, and converts to a dense tensor.
  Args:
    x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
      in the second dimension.
  Returns:
    A rank 1 tensor where missing values of `x` have been filled in.
  """
  default_value = '' if x.dtype == tf.string else 0
  return tf.squeeze(
      tf.sparse.to_dense(
          tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
          default_value),
      axis=1)

This one just aim at taking the raw 'Text' column and simply compute a vocabulary on the output category.

Where my problem is, is when I'm trying to build a model that include the

tensorflow.keras.layers.experimental.preprocessing.TextVectorization

in the layers of my model. I mean, I can include it pretty easily in a model like this:

def _build_keras_model(vectorize_layer: TextVectorization) -> tf.keras.Model:
  """Creates a DNN Keras model for classifying documents.

  Args:
    vectorize_layer: TextVectorization, the layer sizes of the DNN (input layer first).

  Returns:
    A keras Model.
  """

  # The first layer in our model is the vectorization layer. After this layer,
  # we have a tensor of shape (batch_size, features) containing TF-IDF features.
 
  input_layer = tf.keras.layers.Input(name="Text_xf", shape=(), dtype=tf.string)

  deep = vectorize_layer(input_layer)
  deep = layers.Embedding(_max_features + 1, _embedding_dim)(deep)
  deep = layers.Dropout(0.2)(deep)
  deep = layers.GlobalAveragePooling1D()(deep)
  deep = layers.Dropout(0.2)(deep)

  output = layers.Dense(5, activation='sigmoid', name='predictions')(deep)

  # Compile the model with binary crossentropy loss and an adam optimizer.
  model = tf.keras.Model(input_layer, output)
  model.compile(
    loss=losses.SparseCategoricalCrossentropy(from_logits=True), 
    optimizer='adam', 
    metrics=['accuracy'])
    
  return model

And this work. But when I try to fit it with my dataset I get this:

TypeError: in user code:

    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:806 train_function  *
        return step_function(self, iterator)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:796 step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:1211 run
        return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2585 call_for_each_replica
        return self._call_for_each_replica(fn, args, kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/distribute/distribute_lib.py:2945 _call_for_each_replica
        return fn(*args, **kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:789 run_step  **
        outputs = model.train_step(data)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/training.py:747 train_step
        y_pred = self(x, training=True)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:985 __call__
        outputs = call_fn(inputs, *args, **kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/functional.py:386 call
        inputs, training=training, mask=mask)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/functional.py:508 _run_internal_graph
        outputs = node.layer(*args, **kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/engine/base_layer.py:985 __call__
        outputs = call_fn(inputs, *args, **kwargs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/layers/preprocessing/text_vectorization.py:571 call
        inputs = self._preprocess(inputs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/keras/layers/preprocessing/text_vectorization.py:527 _preprocess
        lowercase_inputs = gen_string_ops.string_lower(inputs)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/ops/gen_string_ops.py:1028 string_lower
        "StringLower", input=input, encoding=encoding, name=name)
    /opt/miniconda3/envs/archiving/lib/python3.7/site-packages/tensorflow/python/framework/op_def_library.py:479 _apply_op_helper
        repr(values), type(values).__name__, err))

    TypeError: Expected string passed to parameter 'input' of op 'StringLower', got  of type 'SparseTensor' instead. Error: Expected string, got  of type 'SparseTensor' instead.

I'm fairly new to Tensorflow and I'm trying to understand the whole process of writing pipelines with TFX I don't get why it seems that the vectorization layer is not expecting SparseTensor and expect a string. I do know the implication of using experimental features but if anyone have an idea, or can point to me an obvious mistake I'm making, it would be great !!

I'm running out of ideas to make this work.

Note: I thought it would come from the way I retrieve the dataset:

def _input_fn(file_pattern: List[Text],
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  transformed_feature_spec = (
      tf_transform_output.transformed_feature_spec().copy())
    
  dataset = tf.data.experimental.make_batched_features_dataset(
      file_pattern=file_pattern,
      batch_size=batch_size,
      features=transformed_feature_spec,
      reader=_gzip_reader_fn,
      label_key=_transformed_name(_LABEL_KEY))
  
  return dataset

I also use this like this:

def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
  
  train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
  eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
  
  # TODO: Make better method to adapt vectorizer layer
  text_feature_spec = {_transformed_name('Text'): tf.io.FixedLenFeature([], dtype=tf.string)}
  text_dataset = _input_text_fn(fn_args.train_files, text_feature_spec, 978)
  text_dataset = text_dataset.map(lambda d: d[_transformed_name('Text')]).take(1)

  vectorize_layer = get_vectorize_layer()
  vectorize_layer.adapt(text_dataset)
  model = _build_keras_model(vectorize_layer)

  log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=log_dir, update_freq='batch')
  
  print(train_dataset)
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps,
      callbacks=[tensorboard_callback])

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Solution

  • I've managed to make it work, but not in the cleanest way.

    The reason I got this message is that the TextVectorization layer in the model will only accept a tensor (a dense one it seems), a numpy array, a list or a dataset. So I give him what he want by adapting my code like this (this is the full function updated):

    def run_fn(fn_args: TrainerFnArgs):
      """Train the model based on given args.
    
      Args:
        fn_args: Holds args used to train the model as name/value pairs.
      """
      tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)
      
      train_dataset = _input_fn(fn_args.train_files, tf_transform_output, 40)
      eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output, 40)
      vectorize_dataset = train_dataset.map(lambda f, l: tf.sparse.to_dense(f[_transformed_name('Text')])).unbatch()
      
      vectorize_layer = TextVectorization(
        max_tokens=_max_features, 
        output_mode='int',
        output_sequence_length=500
      )
      vectorize_layer.adapt(vectorize_dataset.take(900))
      model = _build_keras_model(vectorize_layer)
    
      log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
      tensorboard_callback = tf.keras.callbacks.TensorBoard(
          log_dir=log_dir, update_freq='batch')
    
      model.fit(
          train_dataset.map(lambda f, l: (tf.sparse.to_dense(f[_transformed_name('Text')]), l)),
          steps_per_epoch=fn_args.train_steps,
          validation_data=eval_dataset.map(lambda f, l: (tf.sparse.to_dense(f[_transformed_name('Text')]), l)),
          validation_steps=fn_args.eval_steps,
          callbacks=[tensorboard_callback])
    
      signatures = {
          'serving_default':
              _get_serve_tf_examples_fn(model,
                                        tf_transform_output).get_concrete_function(
                                            tf.TensorSpec(
                                                shape=[None],
                                                dtype=tf.string,
                                                name='examples')),
      }
      model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)
    

    Notice the map functions in the parameters of the fit function. The rest stayed the same (pretty much, I just adjusted the shape in the input layer and tweeked the model to get better results).

    I wonder if there is an easier way to achieve this and still keep the benefits of SparseTensor.