Search code examples
pythongoogle-bigquerydataflowapache-beam-io

Beam Python Dataflow, writing to BigQuery table with schema provided throws AttributeError: May not assign arbitrary value tpe to message


I am trying this code on dataflow. Reading from a csv file on gs:/ storage bucket, creating BigQuery table and appending data. The code is as follows:

from __future__ import absolute_import

import argparse
import logging
import os

import apache_beam as beam
from apache_beam.io import ReadFromText, ReadAllFromText
from apache_beam.io import WriteToText
from apache_beam.metrics import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from google.cloud import storage

import regex as re

# storage_client = storage.Client()
# bucket = storage_client.get_bucket('mybucket')
#
# blobs = bucket.list_blobs()
# l=list(blobs)
# x=[y.name for y in l]
# c=x[1:]
# print(len(c))
# details and location of source csv file
source_file = ['gs://xxx/DUMMY.csv']

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    """Main entry point; defines and runs the pipeline."""

    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'DUMMY',
    dataset='test',
    schema = {
  "fields": [
  {
    "name" : "ID",
    "type" : "FLOAT",
    "mode" : "REQUIRED"
  },
  {
    "name" : "CLUSTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SCATTERED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOMISED",
    "tpe"  : "FLOAT",
    "mode" : "NULLABLE"
  },
  {
    "name" : "RANDOM_STRING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  },
  {
    "name" : "SMALL_VC",
    "tpe"  : "INTEGER",
    "mode" : "NULLABLE"
  },
  {
    "name" : "PADDING",
    "tpe"  : "STRING",
    "mode" : "NULLABLE"
  }
]
},

    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

The code throws this error

  File "/usr/local/lib/python3.7/site-packages/apitools/base/protorpclite/messages.py", line 976, in __setattr__
    "to message %s" % (name, type(self).__name__))
AttributeError: May not assign arbitrary value tpe to message TableFieldSchema [while running 'Write to BigQuery/WriteToBigQuery/BigQueryBatchFileLoads/TriggerLoadJobsWithoutTempTables/TriggerLoadJobsWithoutTempTables']

If I try the following

schema='SCHEMA_AUTODETECT'

instead of adding Json schema as above, it works fine. Moreover, if I try to reference a Json file in schema=, it fails. What is the root cause of this? It is my first dataflow/beam program and appreciate any advice.


Solution

  • This is because schema accepts type string of the form 'field1:type1,field2:type2,field3:type3' or TableSchema object that defines a comma separated list of fields. You may refer to this documentation: https://beam.apache.org/releases/pydoc/2.9.0/apache_beam.io.gcp.bigquery.html#apache_beam.io.gcp.bigquery.WriteToBigQuery

    from __future__ import absolute_import
    
    import argparse
    import logging
    import os
    
    import apache_beam as beam
    from apache_beam.io import ReadFromText, ReadAllFromText
    from apache_beam.io import WriteToText
    from apache_beam.metrics import Metrics
    from apache_beam.metrics.metric import MetricsFilter
    from apache_beam.options.pipeline_options import PipelineOptions
    from apache_beam.options.pipeline_options import SetupOptions
    
    
    import re 
    
    # storage_client = storage.Client()
    # bucket = storage_client.get_bucket('mybucket')
    #
    # blobs = bucket.list_blobs()
    # l=list(blobs)
    # x=[y.name for y in l]
    # c=x[1:]
    # print(len(c))
    # details and location of source csv file
    source_file = ['gs://<your_bucket>/1.csv']
    
    class DataIngestion:
        """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
    
    
        def parse_method(self, string_input):
    
            values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
            row = dict(
                zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                    values))
            return row
    
    def run(argv=None):
        """Main entry point; defines and runs the pipeline."""
    
        data_ingestion = DataIngestion()
        
        pipeline_options = PipelineOptions(
        runner='DirectRunner',
        project='your-project',
        temp_location='gs://<your_bucket>/temp',
        region='us-central1')
        
        p = beam.Pipeline(options=pipeline_options)
    
    
        (p
        | 'Create PCollection' >> beam.Create(source_file)
        | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
        | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
        | 'Write to BigQuery' >> beam.io.Write(
        beam.io.WriteToBigQuery(
        'your_table',
        dataset='your_dataset',
        schema ='ID:FLOAT,CLUSTERED:FLOAT,SCATTERED:FLOAT,RANDOMISED:FLOAT,RANDOM_STRING:STRING,SMALL_VC:INTEGER,PADDING:STRING',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
    
        result = p.run()
        result.wait_until_finish()
    
    
    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
    

    Input file csv:

    ID,CLUSTERED,SCATTERED,RANDOMISED,RANDOM_STRING,SMALL_VC,PADDING
    12,5,23,55,team,5,bigdata
    

    Output: enter image description here