Search code examples
pythontensorflowtensorflow2.0tensorflow-datasets

Tensorflow model reading parquet files using IODataset


I have been attempting to use tfio.IODataset.from_parquet to train a model large parquet files. Below is a minimal example of the parquet loading procedure I am using:

pd.DataFrame({'a':[.1,.2], 'b':[.01,.02]}).to_parquet('file.parquet')
ds = tfio.IODataset.from_parquet('file.parquet', columns = ['a','b'])

for batch in ds.batch(5):
    print(batch)

OrderedDict([('a', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.1, 0.2])>), ('b', <tf.Tensor: shape=(2,), dtype=float64, numpy=array([0.01, 0.02])>)])

The batched dataset is type OrderedDict with keys a and b. For training my model I would like something more akin to a "dense" feature vector, instead of two separate keys in an ordereddict. How can I convert the OrderedDict to a dense tuple?

Try 1

As per this example, I tried the following to transform the dataset into "dense" features:

def make_dense(features):
    features = tf.stack(list(features), axis=1)
    return features
ds = ds.map(make_dense)

Unfortunately, that throws errors. I have tried several variations to this theme, including

  1. changing axis=1 to axis=0
  2. using ds = ds.map(lambda *items: tf.stack(items)) instead of my make_dense function.

I imagine this is a very basic operation for IODataset; I just do not know how to accomplish it.


Solution

  • Not the prettiest solution, but you could try something like this:

    import pandas as pd
    import tensorflow_io as tfio
    
    pd.DataFrame({'a':[.1,.2], 'b':[.01,.02]}).to_parquet('file.parquet')
    ds = tfio.IODataset.from_parquet('file.parquet', columns = ['a','b'])
    
    def option1(features):
        keys, values = tf.TensorArray(dtype=tf.string, size=0, dynamic_size=True), tf.TensorArray(dtype=tf.float64, size=0, dynamic_size=True)
        for k, v in features.items():
           keys = keys.write(keys.size(), k)
           values = values.write(values.size(), v)
        return (keys.stack(), values.stack())
    
    def option2(features):
        ta = tf.TensorArray(dtype=tf.float64, size=0, dynamic_size=True)
        for _, v in features.items():
           ta = ta.write(ta.size(), v)
        return ta.stack()  
    
    option1_ds = ds.map(option1)
    for batch in option1_ds.batch(5):
        print(batch)
    
    print()
    
    option2_ds = ds.map(option2)
    for batch in option2_ds.batch(5):
        print(batch)
    
    (<tf.Tensor: shape=(2, 2), dtype=string, numpy=
    array([[b'a', b'b'],
           [b'a', b'b']], dtype=object)>, <tf.Tensor: shape=(2, 2), dtype=float64, numpy=
    array([[0.1 , 0.01],
           [0.2 , 0.02]])>)
    
    tf.Tensor(
    [[0.1  0.01]
     [0.2  0.02]], shape=(2, 2), dtype=float64)