I have this requirement where in I need to develop a Audit Mechanism. For Example there is a JSON file name emp.json:
{
"magic": "atMSG",
"type": "DT",
"headers": null,
"messageschemaid": null,
"messageschema": null,
"message": {
"data": {
"ID": "data_in_quotes",
"NAME": "data_in_quotes",
"SALARY": "data_in_quotes"
},
"beforeData": null,
"headers": {
"operation": "INSERT",
"changeSequence": "20200822230048000000000017887787417",
"timestamp": "2020-08-22T23:00:48.000",
"transactionId": "some_id"
}
}
}
I need to insert the data into Bigquery table first: Staging.emp
ID, NAME, SALARY
1, ABC, 20000
2, XYZ, 30000
I also need to insert the ID, timestamp(Loading timestamp) and filename into a separate table: misc-dataset.Audit_table
ID, TIMESTAMP, FILENAME
1, 28-08-2020 22:55, emp.json
2, 28-08-2020 22:55, emp.json
As of now I am inserting into file to test it. if this is solved I will use
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
"{0}:{1}.emp_data".format(projectId, datasetId),
schema=table_schema,
#write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
I am unable to get that using side output, using the below code.
class InsertIntoBQAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
return l
def process(self, element):
norm1 = json_normalize(json.loads(element), max_level=1)
l1 = norm1["message.data"]
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
return [beam.pvalue.TaggedOutput('Audit', l1)]
options = PipelineOptions()
p = beam.Pipeline(options=options)
data_from_source = (p
| "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
| "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit', main='Load')
)
Audit_col = data_from_source.Audit
Load_col = data_from_source.Load
Load_col | "Write to Actual Table" >> WriteToText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WriteToText("gs://ti-project-1/misc-temp/audit_out")
p.run()
A DoFn cannot have two process methods. Instead, create two separate DoFns and apply each to the same PCollection, e.g.
class Audit(beam.DoFn):
def process(self, l1)
l1['ID']
l1['TIMESTAMP':datetime.now()]
# This line above seems to have gotten mangled in transit...
l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield l1
with beam.Pipeline(options=options) as p:
data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
parsed_data = data_from_source | beam.Map(
lambda element: json_normalize(json.loads(element), max_level=1)["message.data"])
Load_col = parsed_data
Audit_col = parsed_data | beam.ParDo(Audit)
...
You could use a multi-output DoFn for this, which would look like
class ParseAndAudit(beam.DoFn):
def process(self, element):
norm = json_normalize(json.loads(element), max_level=1)
l = norm["message.data"]
yield l # emit the main output
# continue in the same process method
l1 = l
l1['ID']
l1['TIMESTAMP':datetime.now()]
l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
yield beam.pvalue.TaggedOutput('Audit', l1)
and then use it as you ahve above, but that's probably more complicated to write and maintain.