Search code examples
pythontensorflowtensorflow-datasetstensorflow-estimator

How to load new parts of Dataset dynamically during training of an Estimator?


I have an interesting problem.

I am doing regression on a large dataset (15M rows, 16 cols) using tf.Estimator and I used the common way to load data into tf.Dataset:

def input_fn_train(features, labels, batch_size, repeat_count):

    dataset = tf.data.Dataset.from_tensor_slices((dict(features), labels))
    dataset = dataset.shuffle(len(labels)).repeat(repeat_count).batch(batch_size)
    return dataset

features and labels are pandas DataFrames. input_fn works fine for smaller data (up to couple million rows) but when including the whole dataset, it raises:

[libprotobuf FATAL external/protobuf_archive/src/google/protobuf/message_lite.cc:68] CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization. terminate called after throwing an instance of 'google::protobuf::FatalException' what(): CHECK failed: (byte_size_before_serialization) == (byte_size_after_serialization): tensorflow.GraphDef was modified concurrently during serialization.

The cause of this error is that when calling .from_tensor_slices() on explicit data (not on placeholders), TensorFlow creates tf.constant() for each data point. There is an inherent limitation on the size of the graph in TensorFlow, and my data is too big.

In the tensorflow docs, they mention this and also mention a way around this:

"As an alternative, you can define the Dataset in terms of tf.placeholder() tensors, and feed the NumPy arrays when you initialize an Iterator over the dataset."

This method would solve my problem, but the catch lies in the initialization and in the fact, that I cannot access it. There is no way, to feed the actual values to the placeholders when running the initialization op of the iterator of the dataset.

The dataset gets initialized inside tf.Estimator using following hook:

class _DatasetInitializerHook(training.SessionRunHook):

    def __init__(self, iterator):
        self._iterator = iterator

    def begin(self):
        self._initializer = self._iterator.initializer

    def after_create_session(self, session, coord):
        del coord
        session.run(self._initializer)

As you can see, it gets called right after creating session. The problem is that the initializer session run is independent of all the hooks, so no hook will be called on the initialization session run, therefore, there is no way to pass the feed_dict to populate the placeholders.

I cannot initialize the iterator on my own, because there is no way to pass the iterator to the Estimator. The iterator gets initialized after the

A way around this would be to explicitly separate my data into TFRecord files and load them directly using TensorFlow functions, however, this is a highly undesirable solution. In the codebase of my company we have our own optimized binary data format, and using additional files would take a lot of space and IO transaction time, which is critical.

I think there are multiple solutions to my problem, however, I still haven't come up with any. If you have any idea, or suggestion, how this could be done, please share it, thank you!


Solution

  • Okay, I found a solution to my problem. It can be done using Dataset.from_generator() function. My solution uses one generator for generating DataFrames and second to generate rows while iterating over these DataFrames.

    a = arange(20).reshape(10,2)
    df = DataFrame(a, columns=['x1','y1'])
    
    
    def gen_partition():
        for i in range(2):
            df_partition = df.iloc[i * 5 : (i + 1) * 5]
            yield df_partition
    
    
    def gen_fields():
        for partition in gen_partition(): # type: DataFrame
            for row in partition.itertuples():
                yield {'x1': row[1]}, row[2]
    
    
    def input_fn_gen():
        dataset = Dataset.from_generator(
            gen_fields,
            ({'x1': tf.float32}, tf.float32),
            ({'x1': tf.TensorShape([])}, tf.TensorShape([])))
    
        dataset = dataset.shuffle(20).repeat(20).batch(2).prefetch(1)
        return dataset
    
    
    feature_columns = [tf.feature_column.numeric_column('x1')]
    
    dir = get_model_dir('linreg_test')
    
    tf.logging.set_verbosity('INFO')
    
    estimator = tf.estimator.LinearRegressor(
        feature_columns=feature_columns,
        model_dir=dir,
        label_dimension=1
    )
    
    estimator.train(input_fn=lambda: input_fn_gen())