My requirement is to read data from csv file along with header and create the same structure in Google Dat a Store using Python with Dataflow. I had tried creating a sample code as below.
My Sample CSV is below,
First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"
My pyhton 2.7 code snippet is as below
import csv
import datetime
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue
class CSVtoDict(beam.DoFn):
"""Converts line into dictionary"""
def process(self, element, header):
rec = ""
element = element.encode('utf-8')
try:
for line in csv.reader([element]):
rec = line
if len(rec) == len(header):
data = {header.strip(): val.strip() for header, val in zip(header, rec)}
return [data]
else:
logging.info("row contains bad data")
except Exception:
pass
class CreateEntities(beam.DoFn):
"""Creates Datastore entity"""
def process(self, element):
entity = entity_pb2.Entity()
sku = int(element.pop('sku'))
element[1] = float(element[1])
element['salePrice'] = float(element['salePrice'])
element['name'] = unicode(element['name'].decode('utf-8'))
element['type'] = unicode(element['type'].decode('utf-8'))
element['url'] = unicode(element['url'].decode('utf-8'))
element['image'] = unicode(element['image'].decode('utf-8'))
element['inStoreAvailability'] = unicode(element['inStoreAvailability'])
datastore_helper.add_key_path(entity.key, 'Productx', sku)
datastore_helper.add_properties(entity, element)
return [entity]
class ProcessOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
dest='input',
type=str,
required=False,
help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
def dataflow(argv=None):
process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
dataflow()
I can able to load entities using data flow however i would like to parse CSV file from Header and then rows instead of hard coding the values in class CreateEntities and write the same in Data Store Entities.
Basically upload the same CSV file which is given as input to Data flow job with the rows. Can someone please help?
Required Output in Data Store for Key Actor:
First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"
Edit: I had incorporated the code given by u and getting the below error now. I am using Python 2.7 and imported the respective libraries.Sorry i am very new to Python.
Error:
File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
"__main__", fname, loader, pkg_name)
File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
exec code in run_globals
File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
| beam.Map(read_header_from_filename))
File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable
Thanks,GS
Apache Beam parallelizes processing of your data by splitting up reading of the file across many workers which means that most workers will never read the header line at all.
What you want to do is join the rows read with the header line. Since the header line is a small amount of data, you can read it in as a separate PCollection and pass it as a side input to CSVtoDict.
Some example code to read your header line:
def read_header_from_filename(filename):
# note that depending on your newline character/file encoding, this may need to be modified
file_handle = FileSystems.open(filename)
header = file_handle.readline()
return header.split(',')
# Create PCollection containing header line
header = (p
| beam.Create(process_options.input)
| beam.Map(read_header_from_filename))
Your pipeline construction code becomes:
(p
| 'Reading input file' >> beam.io.ReadFromText(process_options.input)
| 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
| 'Create entities' >> beam.ParDo(CreateEntities())
| 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
)
p.run().wait_until_finish()