Search code examples
pythontensorflowkerastfrecord

In TensorFlow 2.0, how to feed TFRecord data to keras model?


I've tried to solve classification problem whose input data having 32 features and 16 labels by Deep Neural Network (DNN).

They look like,

# Input data
shape=(32,), dtype=float32, 
np.array([-0.9349509 ,  0.24052018, -0.29364416,  1.2375807 , -0.15996791,
        0.32468656,  0.43856472,  0.00573635, -0.48105922,  0.09342893,
        0.63206947,  0.44424117,  0.31256443,  0.09699771,  0.31004518,
        0.8685253 ,  0.74120486,  0.65404135, -0.4084895 ,  0.07065713,
        0.33964285, -0.20022233, -0.29489437,  0.40699714,  0.27028704,
        0.74895304, -0.4846958 ,  0.22371463,  0.3422047 , -0.24379562,
        0.38614622,  0.01282159])
# Label (Class)
shape=(16,), dtype=int64, np.array([0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

I want to train my NN with over 100 million data (very large size), so it's not possible load whole dataset into python array.
After googling, I found tf.TFRecord helps me to get out this capacity problem.

I followed the tutorial in the official TensorFlow site to write TFRecord file but could not find how to load the TFReocrd into the Keras Model.

TFRecord Writing Procedure

I used writer to write the dataset in TFRecord file.

writer = tf.io.TFRecordWriter(filenames)

for i range(number_of_sample):
    ...
    writer.write(serialize_example(input_data, label))
writer.close()

And serialize_example method is followed

def serialize_example(input_data, label):
    """
    Creates a tf.Example message ready to be written to a file.
    """
    # Create a dictionary mapping the feature name to the tf.Example-compatible data type.
    feature = {
        'feature': tf.train.Feature(float_list=tf.train.FloatList(value=input_data)),
        'label' : tf.train.Feature(int64_list=tf.train.Int64List(value=label))
    }

    # Create a Features message using tf.train.Example.
    example_proto = tf.train.Example(
        features=tf.train.Features(feature=feature))
    return example_proto.SerializeToString()

TFRecord Reading Procedure

After writing the TFRecord, I find out how to read the files by parsing the string of it

dataset = tf.data.TFRecordDataset(filenames=[filenames])
parsed_dataset = dataset.map(_parse_function, num_parallel_calls=8)
final_dataset = parsed_dataset.shuffle(buffer_size=number_of_sample).batch(10)

print(parsed_dataset)
# <ParallelMapDataset shapes: {feature: (32,), label: (16,)}, 
types: {feature: tf.float32, label: tf.int64}>

for parsed_record in parsed_dataset.take(1):
    print(repr(parsed_record))
'''
{'feature': <tf.Tensor: id=10730, shape=(32,), dtype=float32, numpy=
array([ 0.3584828 ,  0.43238872,  0.84813404, -0.23866414, -0.3381694 ,
       -0.6825514 , -0.20499012, -0.60198826,  0.12879704, -0.6152373 ,
        0.21901904,  0.10998161,  0.04357208, -0.19996743, -0.24080099,
       -0.6282675 ,  0.57822317,  0.10296232, -0.25011575, -0.3454151 ,
        0.6235647 , -0.12194595, -0.18114032, -1.4484204 , -0.11394399,
       -0.20868362, -0.00653742,  0.677903  ,  0.09619896, -0.6428113 ,
       -0.59125495,  0.22995417], dtype=float32)>, 
'label': <tf.Tensor: id=10731, shape=(16,), dtype=int64, 
numpy=array([0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0])>}
'''

And _parse_function method is followed

# Create a description of the features.
feature_description = {
    'feature': tf.io.FixedLenFeature([32, ], tf.float32),
    'label' : tf.io.FixedLenFeature([16, ], tf.int64)
}

def _parse_function(example_proto):
  # Parse the input `tf.Example` proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

Feed TFRecord (Training)

So far it seems like flowing smoothly, but when I tried to feed this dataset into keras Layer, it produced error.

Model Definition and execute training

inputs = keras.Input(shape=(32, ), name='feature')
x = layers.Dense(1024, activation='linear', name='dense_input')(inputs)
outputs = layers.Dense(expected_output, activation='softmax', name='label')(x)

model = keras.Model(inputs=inputs, outputs = outputs)
model.compile(optimizer=tf.keras.optimizers.Adam(0.001), 
loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])

model.fit(final_dataset)

Output would be ...

     1/Unknown - 0s 15ms/step
---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-41-bb3547f32c4a> in <module>()
      8 loss='categorical_crossentropy', metrics=['accuracy','categorical_crossentropy'])
      9 
---> 10 model.fit(final_dataset )

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training.py in fit(self, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, max_queue_size, workers, use_multiprocessing, **kwargs)
    726         max_queue_size=max_queue_size,
    727         workers=workers,
--> 728         use_multiprocessing=use_multiprocessing)
    729 
    730   def evaluate(self,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in fit(self, model, x, y, batch_size, epochs, verbose, callbacks, validation_split, validation_data, shuffle, class_weight, sample_weight, initial_epoch, steps_per_epoch, validation_steps, validation_freq, **kwargs)
    322                 mode=ModeKeys.TRAIN,
    323                 training_context=training_context,
--> 324                 total_epochs=epochs)
    325             cbks.make_logs(model, epoch_logs, training_result, ModeKeys.TRAIN)
    326 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2.py in run_one_epoch(model, iterator, execution_function, dataset_size, batch_size, strategy, steps_per_epoch, num_samples, mode, training_context, total_epochs)
    121         step=step, mode=mode, size=current_batch_size) as batch_logs:
    122       try:
--> 123         batch_outs = execution_function(iterator)
    124       except (StopIteration, errors.OutOfRangeError):
    125         # TODO(kaftan): File bug about tf function and errors.OutOfRangeError?

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in execution_function(input_fn)
     84     # `numpy` translates Tensors to values in Eager mode.
     85     return nest.map_structure(_non_none_constant_value,
---> 86                               distributed_function(input_fn))
     87 
     88   return execution_function

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in __call__(self, *args, **kwds)
    455 
    456     tracing_count = self._get_tracing_count()
--> 457     result = self._call(*args, **kwds)
    458     if tracing_count == self._get_tracing_count():
    459       self._call_counter.called_without_tracing()

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _call(self, *args, **kwds)
    501       # This is the first call of __call__, so we have to initialize.
    502       initializer_map = object_identity.ObjectIdentityDictionary()
--> 503       self._initialize(args, kwds, add_initializers_to=initializer_map)
    504     finally:
    505       # At this point we know that the initialization is complete (or less

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in _initialize(self, args, kwds, add_initializers_to)
    406     self._concrete_stateful_fn = (
    407         self._stateful_fn._get_concrete_function_internal_garbage_collected(  # pylint: disable=protected-access
--> 408             *args, **kwds))
    409 
    410     def invalid_creator_scope(*unused_args, **unused_kwds):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _get_concrete_function_internal_garbage_collected(self, *args, **kwargs)
   1846     if self.input_signature:
   1847       args, kwargs = None, None
-> 1848     graph_function, _, _ = self._maybe_define_function(args, kwargs)
   1849     return graph_function
   1850 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _maybe_define_function(self, args, kwargs)
   2148         graph_function = self._function_cache.primary.get(cache_key, None)
   2149         if graph_function is None:
-> 2150           graph_function = self._create_graph_function(args, kwargs)
   2151           self._function_cache.primary[cache_key] = graph_function
   2152         return graph_function, args, kwargs

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/function.py in _create_graph_function(self, args, kwargs, override_flat_arg_shapes)
   2039             arg_names=arg_names,
   2040             override_flat_arg_shapes=override_flat_arg_shapes,
-> 2041             capture_by_value=self._capture_by_value),
   2042         self._function_attributes,
   2043         # Tell the ConcreteFunction to clean up its graph once it goes out of

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/framework/func_graph.py in func_graph_from_py_func(name, python_func, args, kwargs, signature, func_graph, autograph, autograph_options, add_control_dependencies, arg_names, op_return_value, collections, capture_by_value, override_flat_arg_shapes)
    913                                           converted_func)
    914 
--> 915       func_outputs = python_func(*func_args, **func_kwargs)
    916 
    917       # invariant: `func_outputs` contains only Tensors, CompositeTensors,

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/eager/def_function.py in wrapped_fn(*args, **kwds)
    356         # __wrapped__ allows AutoGraph to swap in a converted function. We give
    357         # the function a weak reference to itself to avoid a reference cycle.
--> 358         return weak_wrapped_fn().__wrapped__(*args, **kwds)
    359     weak_wrapped_fn = weakref.ref(wrapped_fn)
    360 

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in distributed_function(input_iterator)
     71     strategy = distribution_strategy_context.get_strategy()
     72     outputs = strategy.experimental_run_v2(
---> 73         per_replica_function, args=(model, x, y, sample_weights))
     74     # Out of PerReplica outputs reduce or pick values to return.
     75     all_outputs = dist_utils.unwrap_output_dict(

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in experimental_run_v2(self, fn, args, kwargs)
    758       fn = autograph.tf_convert(fn, ag_ctx.control_status_ctx(),
    759                                 convert_by_default=False)
--> 760       return self._extended.call_for_each_replica(fn, args=args, kwargs=kwargs)
    761 
    762   def reduce(self, reduce_op, value, axis):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in call_for_each_replica(self, fn, args, kwargs)
   1785       kwargs = {}
   1786     with self._container_strategy().scope():
-> 1787       return self._call_for_each_replica(fn, args, kwargs)
   1788 
   1789   def _call_for_each_replica(self, fn, args, kwargs):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/distribute/distribute_lib.py in _call_for_each_replica(self, fn, args, kwargs)
   2130         self._container_strategy(),
   2131         replica_id_in_sync_group=constant_op.constant(0, dtypes.int32)):
-> 2132       return fn(*args, **kwargs)
   2133 
   2134   def _reduce_to(self, reduce_op, value, destinations):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/autograph/impl/api.py in wrapper(*args, **kwargs)
    290   def wrapper(*args, **kwargs):
    291     with ag_ctx.ControlStatusCtx(status=ag_ctx.Status.DISABLED):
--> 292       return func(*args, **kwargs)
    293 
    294   if inspect.isfunction(func) or inspect.ismethod(func):

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_v2_utils.py in train_on_batch(model, x, y, sample_weight, class_weight, reset_metrics)
    262       y,
    263       sample_weights=sample_weights,
--> 264       output_loss_metrics=model._output_loss_metrics)
    265 
    266   if reset_metrics:

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in train_on_batch(model, inputs, targets, sample_weights, output_loss_metrics)
    309           sample_weights=sample_weights,
    310           training=True,
--> 311           output_loss_metrics=output_loss_metrics))
    312   if not isinstance(outs, list):
    313     outs = [outs]

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _process_single_batch(model, inputs, targets, output_loss_metrics, sample_weights, training)
    250               output_loss_metrics=output_loss_metrics,
    251               sample_weights=sample_weights,
--> 252               training=training))
    253       if total_loss is None:
    254         raise ValueError('The model cannot be run '

/home/dbadmin/anaconda3/lib/python3.7/site-packages/tensorflow_core/python/keras/engine/training_eager.py in _model_loss(model, inputs, targets, output_loss_metrics, sample_weights, training)
    164 
    165         if hasattr(loss_fn, 'reduction'):
--> 166           per_sample_losses = loss_fn.call(targets[i], outs[i])
    167           weighted_losses = losses_utils.compute_weighted_loss(
    168               per_sample_losses,

IndexError: list index out of range

I don't know what is the list for.


Solution

  • I'm doing something similar in TF 2.0 with a couple differences that may address your issues. Separate parsed_record in features and label:

        feature, label = parsed_record['feature'], parsed_record['label']
    

    To continue getting batches from a dataset use ds.repeat:

        ds.repeat(ds.shuffle(buffer_size=number_of_sample).batch(batch_size))
    

    My full input pipeline looks like:

    def _parse_function_same_side(example_proto):
        """Extracts features and labels.
      
        Args:
            example_proto: tf.Example protocol    
          Returns:
        A `tuple` `(features, labels)`:
          features: A 2D tensor representing the features
          labels: A tensor with the corresponding labels.
        """
        feature_description = {
            "features": tf.io.FixedLenFeature(4, tf.int64), 
            "label": tf.io.FixedLenFeature(1, tf.int64)
                    }
        
        parsed_features = tf.io.parse_single_example(example_proto, feature_description)
        
        features = parsed_features['features']
        
        labels = tf.one_hot(parsed_features['label'],depth=len(hero_vocab))
        return features, labels
    
    def _input_fn(input_filenames, num_epochs=None, 
                  shuffle=True, batch_size=50,compression_type=""):
       
        ds=tf.data.TFRecordDataset(input_filenames,compression_type=compression_type)
        ds=ds.map(_parse_function)
        
        #only shuffle if shuffle flag
        if shuffle:
            ds = ds.shuffle(10000)
        
        #take only dataset of length batch_size
        ds = ds.batch(batch_size)
        
        #make sure you can repeatedly take datasets from the TFRecord
        ds = ds.repeat()
        
        # Return the dataset.
        return ds
    

    After this I just directly feed the dataset to my model.