I am learning to create TensorFlow Extended pipelines and find them greatly useful. However, I have yet to figure out how to debug and test (tabular) data passing through these pipelines. I am aware TensorFlow uses TFRecords/tf.Examples, which are protobufs.
These able humanly readable by using a TFRecordDataset
and tf.Example's parseFromString
.
Still, this format is hard to read.
How would one actually test the data? I feel like I'd need a pandas dataframe. And since we have 100+ columns and different use-cases, I can hardly define all columns each time I'd like to do this. Can I somehow use my schema for this? Thanks!
Edit: I'll accept @TheEngineer's answer, as it gave me the pivotal hint on how to achieve what I wanted. Still, I'd like to share my solution.
Disclaimer: I'm using this code just for testing and to see what's going on within my pipeline. Be careful using this code in production. There might be better and safer ways.
import sys
import numpy as np
import tensorflow_data_validation as tfdv
# Our default values for missing values within the tfrecord. We'll restore them later
STR_NA_VALUE = "NA"
INT_NA_VALUE = -sys.maxsize - 1
FLOAT_NA_VALUE = float("nan")
# Create a dict containing FixedLenFeatures using our schema
def load_schema_as_feature_dict(schema_path):
schema = tfdv.load_schema_text(schema_path)
def convert_feature(feature):
if feature.type == 1:
return tf.io.FixedLenFeature((), tf.string, STR_NA_VALUE)
if feature.type == 2:
return tf.io.FixedLenFeature((), tf.int64, INT_NA_VALUE)
if feature.type == 3:
return tf.io.FixedLenFeature((), tf.float32, FLOAT_NA_VALUE)
raise ValueError("Non-implemented type {}".format(feature.type))
return dict((feature.name, convert_feature(feature)) for feature in schema.feature)
def as_pandas_frame(tfrecord_path, schema_path):
feature_dict = load_schema_as_feature_dict(schema_path)
dataset = tf.data.TFRecordDataset(tfrecord_path, compression_type="GZIP")
parsed_dataset = dataset.map(lambda serialized_example: tf.io.parse_single_example(serialized_example, feature_dict))
df = pd.DataFrame(list(parsed_dataset.as_numpy_iterator()))
# Restore NA values from default_values we had to set
for key, value in {np.object: str.encode(STR_NA_VALUE), np.int64: INT_NA_VALUE, np.float: FLOAT_NA_VALUE}.items():
type_columns = df.select_dtypes(include=[key]).columns
df[type_columns] = df[type_columns].replace({value:None})
return df
Now, you'll just need to call this function with your stored tfrecords and your schema.pbtxt file:
df = as_pandas_frame("path/to/your/tfrecord.gz", "path/to/your/schema.pbtxt")
I'm not sure what you mean by TFRecordDataset
is hard to read. But her is an example of how I would use my TFRecord data. Feature_description
contains features that each sample in TFRecord holds (and their data type) Once you load the records this way, you can do all sort of stuff with it including batching, augmenting, shuffling in the pipeline or accessing the individual files, converting them to numpy etc.
import tensorflow as tf
import numpy as np
from PIL import Image
filenames = []
for i in range(128):
name = "./../result/validation-%.5d-of-%.5d" % (i, 128)
filenames.append(name)
def read_tfrecord(serialized_example):
feature_description = {
'image/height': tf.io.FixedLenFeature((), tf.int64),
'image/width': tf.io.FixedLenFeature((), tf.int64),
'image/colorspace': tf.io.FixedLenFeature((), tf.string),
'image/channels': tf.io.FixedLenFeature((), tf.int64),
'image/class/label': tf.io.FixedLenFeature((), tf.int64),
'image/encoded': tf.io.FixedLenFeature((), tf.string),
}
parsed_features = tf.io.parse_single_example(serialized_example, feature_description)
parsed_features['image/encoded'] = tf.io.decode_jpeg(
parsed_features['image/encoded'], channels=3)
return parsed_features
data = tf.data.TFRecordDataset(filenames)
parsed_dataset = data.shuffle(128).map(read_tfrecord).batch(128)
for sample in parsed_dataset.take(1):
numpyed = sample['image/encoded'].numpy()
img = Image.fromarray(numpyed, 'RGB')
img.show()
tf.print(sample['image/class/label'])