Search code examples
pythoncsvpysparkgoogle-bigquerydataflow

Read CSV file with dataflow but add two additional columns op_type and op_time before ingesting rows into Google BigQuery


I have a dataflow code that reads a CSV file from the storage bucket in gs:// and ingests that CSV file into BigQuery table. BigQuery table is already created. Below code works fine.

class DataIngestion:
    """A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""


    def parse_method(self, string_input):

        values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
        row = dict(
            zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                values))
        return row

def run(argv=None):
    data_ingestion = DataIngestion()
    p = beam.Pipeline(options=PipelineOptions())


    (p
    | 'Create PCollection' >> beam.Create(source_file)
    | 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1)  ## ignore the csv header
    | 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
    | 'Write to BigQuery' >> beam.io.Write(
    beam.io.WriteToBigQuery(
    'DUMMY',
    dataset='test',
    create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))

    result = p.run()
    result.wait_until_finish()

However, I need two additional columns to be ingested for every row in CSV file; namely op_type and op_time. These are as below from the BigQuery table definition.

Field name
Type
Mode
Policy tags
Description
ID  FLOAT   REQUIRED    
CLUSTERED   FLOAT   NULLABLE    
SCATTERED   FLOAT   NULLABLE    
RANDOMISED  FLOAT   NULLABLE    
RANDOM_STRING   STRING  NULLABLE    
SMALL_VC    STRING  NULLABLE    
PADDING STRING  NULLABLE    
op_type INTEGER REQUIRED    
op_time TIMESTAMP   REQUIRED    

In PySpark I can achive this by adding two columns to the dataframe as below:

df= self.spark.createDataFrame(rdd, schema = Schema)
df = df. \
             withColumn("op_type", lit(1)). \
             withColumn("op_time", current_timestamp())

So the op_type is set to 1 meaning an insert and op_time needs to be the current_timestamp()

How could this be achieved with dataflow? These two columns are added columns so somehow 'String To BigQuery Row' should reflect that?

Thanks


Solution

  • This worked

    from datetime import datetime
    
        def parse_method(self, string_input):
    
            values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
            row = dict(
                zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
                    values))
            Timestamp = datetime.now()
            static_cols = {'op_type': 1, 'op_time': Timestamp}
            row.update(static_cols)
            return row
    

    Note the definition of Timestamp in Python -> Timestamp = datetime.now() to map correctly to a column of TIMESTAMP type in BigQuery