I am trying to train a model that detects whether someone is using sunglasses using tfx and a subset of the celebsA dataset (~ 26k images). I have written the images and labels to a TFrecord that is 232.9MB.
When I then go through the different components, I always run out of memory when running the Transform component. Is this normal? By the way, I am running this on a TPU with 32GB of RAM as I am using Google Colab Pro.
If so, what would be the best way to overcome the problem? Just create many smaller records and pass them through the components one by one?
Here is the code I have been using:
Code for writing to TFRecord:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential
import tensorflow_datasets as tfds
import pandas as pd
import numpy as np
from PIL import Image
import shutil
import random
import os
import io
import matplotlib.pyplot as plt
%matplotlib inline
from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/MyDrive/Machine_Learning_stuff/celebs/
RAW_SUNGLASSES_DIR='./sunglasses_classifier/sunglasses_imgs/raw/'
SUNGLASSES_TFRECORD_DIR= './sunglasses_classifier/data/rec_sunglasses/sunglasses_full.tfrecords'
def _bytes_feature(value):
"""Returns a bytes_list from a string / byte."""
if isinstance(value, type(tf.constant(0))):
value = value.numpy() # BytesList won't unpack a string from an EagerTensor.
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def _float_feature(value):
"""Returns a float_list from a float / double."""
return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))
def _int64_feature(value):
"""Returns an int64_list from a bool / enum / int / uint."""
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def image_resize_to_byte_array(image:Image):
imgByteArr = io.BytesIO()
image=image.resize((256,256))
image.save(imgByteArr, format="jpeg")
imgByteArr = imgByteArr.getvalue()
return imgByteArr
#Remove any corrupted files and non jpeg files
!find ${RAW_SUNGLASSES_DIR} -size 0 -exec rm {} +
!find ${RAW_SUNGLASSES_DIR} -type f ! -name "*.jpg" -exec rm {} +
image_labels={}
for filename in os.listdir(RAW_SUNGLASSES_DIR + '1-sunglasses'):
if '.jpg' in filename:
file_path=os.path.join(RAW_SUNGLASSES_DIR,'1-sunglasses',filename)
#print(file_path)
image_labels[file_path]=1
for filename in os.listdir(RAW_SUNGLASSES_DIR + 'no_sunglasses'):
if '.jpg' in filename:
file_path=os.path.join(RAW_SUNGLASSES_DIR,'no_sunglasses',filename)
#print(file_path)
image_labels[file_path]=0
# Create a dictionary with features that are relevant.
def image_example(image_string, label):
image_shape = tf.io.decode_jpeg(image_string).shape
feature = {
'label': _int64_feature(label),
'image_raw': _bytes_feature(image_string),
}
return tf.train.Example(features=tf.train.Features(feature=feature))
with tf.io.TFRecordWriter(SUNGLASSES_TFRECORD_DIR) as writer:
for filepath, label in image_labels.items():
image_bytes=image_resize_to_byte_array(Image.open(filepath,mode='r'))
#image_string = open(filepath, 'rb').read()
tf_example = image_example(image_bytes, label)
writer.write(tf_example.SerializeToString())
Code for TFX pipeline:
import tensorflow as tf
from tensorflow import keras
#import tensorflow_datasets as tfds
import os
import pprint
#import tfx
from tfx.components import ImportExampleGen
from tfx.components import ExampleValidator
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Transform
from tfx.components import Tuner
from tfx.components import Trainer
from tfx.proto import example_gen_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
# Location of the pipeline metadata store
_pipeline_root = 'pipeline/'
# Directory of the raw data files
_data_root = './data/rec_sunglasses/'
from google.colab import drive
drive.mount('/content/gdrive')
%cd gdrive/MyDrive/Machine_Learning_stuff/celebs/sunglasses_classifier/
context = InteractiveContext(pipeline_root=_pipeline_root)
#ExampleGen
example_gen = ImportExampleGen(input_base=_data_root)
context.run(example_gen)
#StatisticsGen
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistics_gen)
#SchemaGen
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'],infer_feature_shape=True)
context.run(schema_gen)
#ExampleValidator
example_validator = ExampleValidator(
statistics=statistics_gen.outputs['statistics'],
schema=schema_gen.outputs['schema'])
context.run(example_validator)
#Transform
_transform_module_file = 'sunglasses_transform.py'
%%writefile {_transform_module_file}
import tensorflow as tf
import tensorflow_transform as tft
# Keys
_LABEL_KEY = "label"
_IMAGE_KEY = "image_raw"
def _transformed_name(key):
return key + '_xf'
def _image_parser(image_str):
'''converts the images to a float tensor'''
image = tf.image.decode_image(image_str,channels=3)
image = tf.reshape(image, (256, 256, 3))
image = tf.cast(image, tf.float32)
return image
def _label_parser(label_id):
'''converts the labels to a float tensor'''
label = tf.cast(label_id, tf.float32)
return label
def preprocessing_fn(inputs):
"""tf.transform's callback function for preprocessing inputs.
Args:
inputs: map from feature keys to raw not-yet-transformed features.
Returns:
Map from string feature key to transformed feature operations.
"""
# Convert the raw image and labels to a float array
#print(inputs)
outputs = {
_transformed_name(_IMAGE_KEY):
tf.map_fn(
_image_parser,
tf.squeeze(inputs[_IMAGE_KEY], axis=1),
dtype=tf.float32),
_transformed_name(_LABEL_KEY):
tf.map_fn(
_label_parser,
inputs[_LABEL_KEY],
dtype=tf.float32)
}
# scale the pixels from 0 to 1
outputs[_transformed_name(_IMAGE_KEY)] = tft.scale_to_0_1(outputs[_transformed_name(_IMAGE_KEY)])
return outputs
When I then run the code below I always get a message after about 23 mins stating that my runtime was restarted because I ran out of RAM.
# Ignore TF warning messages
tf.get_logger().setLevel('ERROR')
# Setup the Transform component
transform = Transform(
examples=example_gen.outputs['examples'],
schema=schema_gen.outputs['schema'],
module_file=os.path.abspath(_transform_module_file))
# Run the component
context.run(transform)
I think the bug is in Transform
itself, as I am encountering an OOM issue passing it a preprocessing_fn
but do not encounter the same issue when passing the same preprocessing_fn
to the function below. Secondly, I face the OOM issue even when just having a training split. In the case where you are using Transform
, isn't the beam code essentially abstracted from you, so how can you override what it is doing? I see the only solution here to not use Transform
and use something like the below, the downside being that you lose all the MLMetadata
and pipeline
benefits, which are significant :(:
def transform_data(preprocessing_fn, source_path, source_tfrecord_prefix , experiment_path):
schema = get_schema_from_transform(source_path)
source_tfrecord_path = f"{source_path}/tfrecords/{source_tfrecord_prefix}*"
with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mkdtemp()):
tfrecord_tfxio = tfxio.TFExampleRecord(
file_pattern=source_tfrecord_path , schema=schema)
raw_data = (
pipeline | 'ReadData' >> beam.io.ReadFromTFRecord(
file_pattern=source_tfrecord_path, coder=beam.coders.BytesCoder())
| 'DecodeTrainData' >> tfrecord_tfxio.BeamSource())
raw_dataset = (raw_data, tfrecord_tfxio.TensorAdapterConfig())
transformed_dataset, transform_fn = (
raw_dataset | tft_beam.AnalyzeAndTransformDataset(
preprocessing_fn, output_record_batches=True))
transformed_data, _ = transformed_dataset
transform_fn_output = os.path.join(experiment_path, 'transform_output')
tfrecord_file_path_prefix = os.path.join(experiment_path, 'tfrecords', experiment_path)
data_written = (
transformed_data | 'EncodeTrainData' >> beam.FlatMapTuple(
lambda batch, _: RecordBatchToExamples(batch)) | beam.io.WriteToTFRecord(
tfrecord_file_path_prefix, ))
_ = (transform_fn | 'WriteTransformFn' >> tft_beam.WriteTransformFn(transform_fn_output))
My code that generates the OOM is as follows:
from tfx import v1 as tfx
import tensorflow_data_validation as tfdv
TRAIN_DATA = 'train_smallest.csv'
LABEL_NAME = 'fare_amount'
BATCH_SIZE=256
ORIGINAL_TFRECORD_PATH='./identity_transform/tfrecords/'
from tfx.components import ImportExampleGen
from tfx.components import StatisticsGen
from tfx.components import SchemaGen
from tfx.components import Transform
from tfx.v1 import proto
from tfx.proto import example_gen_pb2
output = proto.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
proto.SplitConfig.Split(name='train', hash_buckets=3),
proto.SplitConfig.Split(name='eval', hash_buckets=1)
]))
example_gen = ImportExampleGen(input_base=ORIGINAL_TFRECORD_PATH, output_config=output)
stats_options = tfdv.StatsOptions(label_feature=LABEL_NAME)
statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'], stats_options=stats_options)
schema_gen = SchemaGen(
statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
transform = Transform(examples = example_gen.outputs['examples'], schema = schema_gen.outputs['schema'], module_file='./preprocessing_fn.py')
components = [
example_gen,
statistics_gen,
schema_gen,
transform
]
pipeline_name='pipeline'
pipeline_root='./pipeline'
metadata_path='./metadata/pipeline/metadata.db'
pipeline = tfx.dsl.Pipeline(
pipeline_name=pipeline_name,
pipeline_root=pipeline_root,
metadata_connection_config=tfx.orchestration.metadata
.sqlite_metadata_connection_config(metadata_path),
components=components,
enable_cache=False)
tfx.orchestration.LocalDagRunner().run(pipeline)
My preprocessing_fn
is:
import tensorflow as tf
import tensorflow_addons as tfa
import tensorflow_transform as tft
NBUCKETS = 10
def preprocessing_fn(inputs):
"""
Preprocess input columns into transformed features. This is what goes
into tensorflow transform/apache beam.
"""
# Since we are modifying some features and leaving others unchanged, we
# start by setting `outputs` to a copy of `inputs.
transformed = inputs.copy()
del(transformed["key"])
transformed['passenger_count'] = tft.scale_to_0_1(
inputs['passenger_count'])
# cannot use the below in tft as managing those learned values need
# to be
# managed carefully
# normalizer = tf.keras.layers.Normalization(axis=None,
# name="passenger_count_normalizer")
# normalizer.adapt(inputs['passenger_count'])
# transformed['other_passenger_count'] = normalizer(
# inputs['passenger_count'])
for col in ['dropoff_longitude', 'dropoff_latitude']:
transformed[col] = tft.sparse_tensor_to_dense_with_shape(inputs[col], default_value=tft.mean(inputs[col]), shape=[None, 1]) #You can make this more robust by using the shape from the feature spec
for lon_col in ['pickup_longitude', 'dropoff_longitude']:
# transformed[lon_col] = scale_longitude(inputs[lon_col])
transformed[lon_col] = (transformed[lon_col] + 78) / 8.
for lat_col in ['pickup_latitude', 'dropoff_latitude']:
transformed[lat_col] = (transformed[lat_col] - 37) / 8.
position_difference = tf.square(
transformed["dropoff_longitude"] -
transformed["pickup_longitude"])
position_difference += tf.square(
transformed["dropoff_latitude"] -
transformed["pickup_latitude"])
transformed['euclidean'] = tf.sqrt(position_difference)
lat_lon_buckets = [
bin_edge / NBUCKETS
for bin_edge in range(0, NBUCKETS)]
transformed['bucketed_pickup_longitude'] = tft.apply_buckets(
transformed["pickup_longitude"],
bucket_boundaries=tf.constant([lat_lon_buckets]))
transformed["bucketed_pickup_latitude"] = tft.apply_buckets(
transformed['pickup_latitude'],
bucket_boundaries=tf.constant([lat_lon_buckets]))
transformed['bucketed_dropoff_longitude'] = tft.apply_buckets(
transformed["dropoff_longitude"],
bucket_boundaries=tf.constant([lat_lon_buckets]))
transformed['bucketed_dropoff_latitude'] = tft.apply_buckets(
transformed["dropoff_latitude"],
bucket_boundaries=tf.constant([lat_lon_buckets]))
# transformed["pickup_cross"]=tf.sparse.cross(
# inputs=[transformed['pickup_latitude_apply_buckets'],
# transformed['pickup_longitude_apply_buckets']])
hash_pickup_crossing_layer = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='one_hot', num_bins=NBUCKETS**2, name='hash_pickup_crossing_layer')
transformed['pickup_location'] = hash_pickup_crossing_layer(
(transformed['bucketed_pickup_latitude'],
transformed['bucketed_pickup_longitude']))
hash_dropoff_crossing_layer = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='one_hot', num_bins=NBUCKETS**2,
name='hash_dropoff_crossing_layer')
transformed['dropoff_location'] = hash_dropoff_crossing_layer(
(transformed['bucketed_dropoff_latitude'],
transformed['bucketed_dropoff_longitude']))
hash_pickup_crossing_layer_intermediary = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='int', num_bins=NBUCKETS**2, )
hashed_pickup_intermediary = hash_pickup_crossing_layer_intermediary(
(transformed['bucketed_pickup_longitude'],
transformed['bucketed_pickup_latitude']))
hash_dropoff_crossing_layer_intermediary = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='int', num_bins=NBUCKETS**2, )
hashed_dropoff_intermediary = hash_dropoff_crossing_layer_intermediary(
(transformed['bucketed_dropoff_longitude'],
transformed['bucketed_dropoff_latitude']))
hash_trip_crossing_layer = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='one_hot', num_bins=NBUCKETS ** 3,
name="hash_trip_crossing_layer")
transformed['hashed_trip'] = hash_trip_crossing_layer(
(hashed_pickup_intermediary,
hashed_dropoff_intermediary))
seconds_since_1970 = tf.cast(
tfa.text.parse_time(
inputs["pickup_datetime"],
"%Y-%m-%d %H:%M:%S %Z",
output_unit="SECOND"),
tf.float32)
# seconds_since_1970 = fn_seconds_since_1970(inputs['pickup_datetime'])
seconds_since_1970 = tf.cast(seconds_since_1970, tf.float32)
hours_since_1970 = seconds_since_1970 / 3600.
hours_since_1970 = tf.floor(hours_since_1970)
hour_of_day_intermediary = hours_since_1970 % 24
transformed['hour_of_day'] = hour_of_day_intermediary
hour_of_day_intermediary = tf.cast(hour_of_day_intermediary, tf.int32)
days_since_1970 = seconds_since_1970 / (3600 * 24)
days_since_1970 = tf.floor(days_since_1970)
# January 1st 1970 was a Thursday
day_of_week_intermediary = (days_since_1970 + 4) % 7
transformed['day_of_week'] = day_of_week_intermediary
day_of_week_intermediary = tf.cast(day_of_week_intermediary, tf.int32)
hashed_crossing_layer = tf.keras.layers.experimental.preprocessing.HashedCrossing(
num_bins=24 * 7, output_mode="one_hot")
hashed_crossing_layer_intermediary = tf.keras.layers.experimental.preprocessing.HashedCrossing(
num_bins=24 * 7, output_mode="int", name='hashed_hour_of_day_of_week_layer')
transformed['hour_of_day_of_week'] = hashed_crossing_layer(
(hour_of_day_intermediary, day_of_week_intermediary))
hour_of_day_of_week_intermediary = hashed_crossing_layer_intermediary(
(hour_of_day_intermediary, day_of_week_intermediary))
hash_trip_crossing_layer_intermediary = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='int', num_bins=NBUCKETS ** 3)
hashed_trip_intermediary = hash_trip_crossing_layer_intermediary(
(hashed_pickup_intermediary, hashed_dropoff_intermediary))
hash_trip_and_time_layer = tf.keras.layers.experimental.preprocessing.HashedCrossing(
output_mode='one_hot', num_bins=(
NBUCKETS ** 3) * 4, name='hash_trip_and_time_layer')
transformed['hashed_trip_and_time'] = hash_trip_and_time_layer(
(hashed_trip_intermediary, hour_of_day_of_week_intermediary))
return transformed
And the data that I'm using is as follows, this is before putting it into tfrecords by using a preprocessing_fn
that is essentially f(x) = x
:
key,fare_amount,pickup_datetime,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count
2010-10-19 00:01:17.0000003,11.3,2010-10-19 00:01:17 UTC,-73.948724,40.777489,-73.949569,40.814049,1
2009-08-19 08:58:00.00000031,8.5,2009-08-19 08:58:00 UTC,-74.007743,40.724717,-74.006797,40.751253,1
2011-01-24 06:48:12.0000001,10.9,2011-01-24 06:48:12 UTC,-73.986678,40.742597,-73.955101,40.766174,1
2014-09-13 09:08:00.000000126,15.5,2014-09-13 09:08:00 UTC,-74.00325,40.7083,-73.975935,40.749007,1
2013-04-12 19:12:22.0000002,10,2013-04-12 19:12:22 UTC,-74.005318,40.728261,-73.981724,40.7293,1
2015-03-11 11:09:40.0000003,13,2015-03-11 11:09:40 UTC,-73.998809814453125,40.734573364257812,-73.989830017089844,40.756542205810547,1
2014-02-26 01:06:40.0000001,8,2014-02-26 01:06:40 UTC,-73.985821,40.763299,-74.003947,40.751722,1
2011-12-03 02:34:21.0000002,15.7,2011-12-03 02:34:21 UTC,-73.940638,40.840057,-73.98792,40.768815,2
2012-12-10 15:50:03.0000002,34,2012-12-10 15:50:03 UTC,-73.872871,40.774284,-73.995264,40.739349,1
2013-09-22 21:15:18.0000001,7.5,2013-09-22 21:15:18 UTC,-73.996565,40.718924,-74.007011,40.707672,2
2011-06-13 20:19:00.00000010,8.1,2011-06-13 20:19:00 UTC,-73.981587,40.747238,-73.956932,40.771512,5
2013-04-13 02:28:55.0000006,4,2013-04-13 02:28:55 UTC,0,0,0,0,1
2013-09-08 15:49:49.0000001,5.5,2013-09-08 15:49:49 UTC,-73.96077,40.775805,-73.970084,40.76252,1
2011-10-05 21:59:00.00000098,6.5,2011-10-05 21:59:00 UTC,-74.005052,40.70663,-74.012255,40.718838,1
2014-10-03 04:04:00.00000020,5.5,2014-10-03 04:04:00 UTC,-74.000032,40.732362,-73.99655,40.736532,5
2010-06-18 13:41:28.0000001,12.1,2010-06-18 13:41:28 UTC,-73.998732,40.717089,-73.975146,40.758038,3
2014-08-07 12:06:50.0000001,13,2014-08-07 12:06:50 UTC,-74.008268,40.722489,-73.982861,40.744874,1
2009-08-30 11:56:58.0000005,5.7,2009-08-30 11:56:58 UTC,-74.007583,40.748017,-73.994615,40.751834,1
2012-01-28 01:58:00.000000132,9.7,2012-01-28 01:58:00 UTC,-74.422832,40.84662,-74.49563,40.968122,1
2012-06-01 13:33:00.00000070,6.9,2012-06-01 13:33:00 UTC,-73.987377,40.743832,-73.998615,40.739855,1
2014-07-31 20:00:00.000000140,14.5,2014-07-31 20:00:00 UTC,-73.983415,40.75677,-73.95324,40.77148,1
2009-02-06 09:25:00.000000128,6.9,2009-02-06 09:25:00 UTC,-73.994222,40.727137,-73.987398,40.744425,1
2012-01-07 20:28:30.0000003,6.9,2012-01-07 20:28:30 UTC,-73.9889,40.7591,-74.0053,40.7401,0
2012-06-13 15:57:00.000000105,8.5,2012-06-13 15:57:00 UTC,-73.982535,40.761602,-73.980302,40.748475,1
2014-05-02 10:33:00.00000042,5,2014-05-02 10:33:00 UTC,-73.985352,40.74213,-73.991562,40.742512,1
2013-03-30 00:33:00.000000133,15,2013-03-30 00:33:00 UTC,-73.98616,40.757022,-74.009022,40.707857,1
2011-05-20 23:01:00.00000071,9.3,2011-05-20 23:01:00 UTC,-73.951177,40.77465,-73.972487,40.743393,1
2011-01-27 21:51:00.000000171,8.9,2011-01-27 21:51:00 UTC,-73.989867,40.756748,-73.972143,40.786588,3
2009-03-20 12:46:25.0000001,6.9,2009-03-20 12:46:25 UTC,-73.951526,40.770003,-73.970998,40.754989,1
2013-05-01 09:32:00.000000143,7,2013-05-01 09:32:00 UTC,-73.990302,40.756552,-73.982462,40.760242,5
2010-05-25 10:10:00.00000027,4.9,2010-05-25 10:10:00 UTC,-73.980722,40.779832,-73.971522,40.787518,1
2012-12-18 07:24:00.000000148,6,2012-12-18 07:24:00 UTC,-73.965952,40.776297,-73.950533,40.774467,4
2012-04-18 08:36:00.000000182,7.7,2012-04-18 08:36:00 UTC,-73.98358,40.766182,-73.97922,40.752992,1
2009-05-23 17:11:00.00000092,42.9,2009-05-23 17:11:00 UTC,-73.781909,40.64477,-73.991234,40.687251,2
2013-04-16 08:44:39.0000006,12,2013-04-16 08:44:39 UTC,-73.961365,40.760555,-73.9805,40.753709,1
2014-04-15 18:39:47.0000002,3.5,2014-04-15 18:39:47 UTC,-74.008172,40.737866,-74.007745,40.732653,1
2013-12-22 17:42:24.0000001,8,2013-12-22 17:42:24 UTC,-73.977505,40.742731,-73.980127,40.722385,2
2011-09-07 20:12:37.0000004,14.5,2011-09-07 20:12:37 UTC,-74.002225,40.715,-73.951187,40.728798,1
2014-12-10 21:57:46.0000005,6.5,2014-12-10 21:57:46 UTC,-73.97175,40.760287,0,0,1
2012-11-12 22:11:00.000000101,6.5,2012-11-12 22:11:00 UTC,-73.983615,40.760325,-73.998885,40.760012,1
2015-02-21 19:26:56.0000005,15,2015-02-21 19:26:56 UTC,-73.9959716796875,40.686809539794922,-74.009872436523438,40.718009948730469,1
2010-10-06 03:28:42.0000001,8.9,2010-10-06 03:28:42 UTC,-73.988341,40.72886,-73.984581,40.751519,1
2010-07-09 22:01:22.0000007,4.1,2010-07-09 22:01:22 UTC,-73.959272,40.771833,-73.962304,40.773124,1
2009-05-28 10:45:00.00000032,8.9,2009-05-28 10:45:00 UTC,-73.988872,40.753367,-73.972733,40.753327,1
2013-09-24 18:25:00.000000123,25,2013-09-24 18:25:00 UTC,-74.005197,40.751602,-73.980317,40.676607,1
2009-02-05 08:23:01.0000004,7.3,2009-02-05 08:23:01 UTC,-73.975468,40.759635,-73.991854,40.749352,1
2011-07-03 22:25:04.0000003,28.5,2011-07-03 22:25:04 UTC,-73.776755,40.64523,-73.951802,40.657781,1
2010-06-07 15:20:00.000000164,17.7,2010-06-07 15:20:00 UTC,-73.969625,40.758133,-74.012548,40.713983,1
2012-09-14 01:02:00.000000106,7,2012-09-14 01:02:00 UTC,-73.982777,40.744722,-73.984505,40.732127,1
2013-03-08 23:16:00.00000051,25.5,2013-03-08 23:16:00 UTC,-73.990822,40.734702,-73.945737,40.651117,1
2009-10-30 16:06:00.00000074,10.1,2009-10-30 16:06:00 UTC,-73.981008,40.768257,-73.968412,40.80144,2
2012-12-13 19:08:47.0000004,4.5,2012-12-13 19:08:47 UTC,-73.949347,40.78134,-73.956487,40.777995,1
2009-09-03 18:08:00.000000222,5.3,2009-09-03 18:08:00 UTC,-73.985702,40.753625,-73.989385,40.741143,5
2014-05-09 14:18:00.00000085,22.5,2014-05-09 14:18:00 UTC,-73.994697,40.765992,-74.012857,40.706807,1
2010-06-13 18:07:00.00000026,5.7,2010-06-13 18:07:00 UTC,-73.98811,40.774712,-73.98207,40.763285,1
2009-05-30 22:44:00.0000003,15.7,2009-05-30 22:44:00 UTC,-73.968862,40.791558,-73.971705,40.793732,2
2009-05-03 09:58:47.0000002,3.7,2009-05-03 09:58:47 UTC,-73.966445,40.804635,-73.969422,40.798094,1
2011-02-21 22:48:57.0000004,5.7,2011-02-21 22:48:57 UTC,-73.977624,40.752278,-73.994179,40.751649,1
2009-01-09 22:16:00.00000018,4.5,2009-01-09 22:16:00 UTC,-73.961832,40.763897,-73.96975,40.759523,1
2011-06-17 10:31:00.000000154,7.3,2011-06-17 10:31:00 UTC,-73.963242,40.771425,-73.968752,40.7537,1
2014-11-24 12:38:00.000000143,5,2014-11-24 12:38:00 UTC,-73.969662,40.753422,-73.975425,40.74545,1
2015-05-03 09:46:20.0000007,14.5,2015-05-03 09:46:20 UTC,-73.9213638305664,40.745254516601562,-73.85888671875,40.751045227050781,2
2010-04-05 18:49:28.0000001,8.9,2010-04-05 18:49:28 UTC,-73.960298,40.766187,-73.981875,40.727677,1
2013-12-17 20:12:00.00000030,6.5,2013-12-17 20:12:00 UTC,-73.956007,40.779077,-73.965855,40.765742,1
2010-03-12 21:29:00.000000235,9.3,2010-03-12 21:29:00 UTC,0,0,0,0,3
2011-11-30 17:09:00.000000158,14.1,2011-11-30 17:09:00 UTC,-73.985565,40.731717,-73.981347,40.77369,1
2011-12-18 22:21:00.000000162,12.1,2011-12-18 22:21:00 UTC,-73.995165,40.753117,-73.98827,40.725642,1
2010-09-02 14:39:00.000000201,4.1,2010-09-02 14:39:00 UTC,-73.978898,40.785037,-73.970818,40.789267,1
2014-08-22 16:00:00.00000098,6,2014-08-22 16:00:00 UTC,-73.97484,40.756925,-73.972712,40.761775,3
@tensorflow-support let's solve this issue and make Transform
and LocalDagRunner
usable so people can solve cool problems with it!