I have a dataflow code that reads a CSV file from the storage bucket in gs:// and ingests that CSV file into BigQuery table. BigQuery table is already created. Below code works fine.
class DataIngestion:
"""A helper class which contains the logic to translate the source csv file into a format BigQuery will accept."""
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
return row
def run(argv=None):
data_ingestion = DataIngestion()
p = beam.Pipeline(options=PipelineOptions())
(p
| 'Create PCollection' >> beam.Create(source_file)
| 'Read from a File' >> beam.io.ReadAllFromText(skip_header_lines=1) ## ignore the csv header
| 'String To BigQuery Row' >> beam.Map(lambda s: data_ingestion.parse_method(s)) # s is each of the String elements read in the beam.io.ReadAllFromText transform, and we apply a lambda
| 'Write to BigQuery' >> beam.io.Write(
beam.io.WriteToBigQuery(
'DUMMY',
dataset='test',
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
result = p.run()
result.wait_until_finish()
However, I need two additional columns to be ingested for every row in CSV file; namely op_type and op_time. These are as below from the BigQuery table definition.
Field name
Type
Mode
Policy tags
Description
ID FLOAT REQUIRED
CLUSTERED FLOAT NULLABLE
SCATTERED FLOAT NULLABLE
RANDOMISED FLOAT NULLABLE
RANDOM_STRING STRING NULLABLE
SMALL_VC STRING NULLABLE
PADDING STRING NULLABLE
op_type INTEGER REQUIRED
op_time TIMESTAMP REQUIRED
In PySpark I can achive this by adding two columns to the dataframe as below:
df= self.spark.createDataFrame(rdd, schema = Schema)
df = df. \
withColumn("op_type", lit(1)). \
withColumn("op_time", current_timestamp())
So the op_type is set to 1 meaning an insert and op_time needs to be the current_timestamp()
How could this be achieved with dataflow? These two columns are added columns so somehow 'String To BigQuery Row' should reflect that?
Thanks
This worked
from datetime import datetime
def parse_method(self, string_input):
values = re.split(",",re.sub('\r\n', '', re.sub(u'"', '', string_input)))
row = dict(
zip(('ID', 'CLUSTERED', 'SCATTERED', 'RANDOMISED', 'RANDOM_STRING', 'SMALL_VC', 'PADDING'),
values))
Timestamp = datetime.now()
static_cols = {'op_type': 1, 'op_time': Timestamp}
row.update(static_cols)
return row
Note the definition of Timestamp in Python -> Timestamp = datetime.now() to map correctly to a column of TIMESTAMP type in BigQuery