I am trying this code on dataflow. Reading from a csv file on gs:/ storage bucket, creating BigQuery table and appending data. The code is as follows:
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage
import regex as re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
schema = {
"fields": [
{
"name" : "ID",
"type" : "FLOAT",
"mode" : "REQUIRED"
},
{
"name" : "CLUSTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "SCATTERED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOMISED",
"tpe" : "FLOAT",
"mode" : "NULLABLE"
},
{
"name" : "RANDOM_STRING",
"tpe" : "STRING",
"mode" : "NULLABLE"
},
{
"name" : "SMALL_VC",
"tpe" : "INTEGER",
"mode" : "NULLABLE"
},
{
"name" : "PADDING",
"tpe" : "STRING",
"mode" : "NULLABLE"
}
]
},
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
The code throws this error
File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
"to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']
If I try the following
schema='SCHEMA_AUTODETECT'
instead of adding Json schema as above, it works fine. Moreover, if I try to reference a Json file in schema=, it fails. What is the root cause of this? It is my first dataflow/beam program and appreciate any advice.
This is because schema
accepts type string of the form 'field1:type1,field2:type2,field3:type3'
or TableSchema
object that defines a comma separated list of fields. You may refer to this documentation: https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery
from __future__ import absolute_import
import argparse
import logging
import os
import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import re
# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://<your_bucket>/1.csv']
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
"""Main entry point; defines and runs the pipeline."""
data_ingestion = DataIngestion()
pipeline_options = PipelineOptions(
runner='DirectRunner',
project='your-project',
temp_location='gs://<your_bucket>/temp',
region='us-central1')
p = beam.Pipeline(options=pipeline_options)
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'your_table',
dataset='your_dataset',
schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
Input file csv:
ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
12,5,23,55,team,5,bigdata