Search code examples
google-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beamaudit-logging

Insert data into BigQuery separate tables from same file with different schema using Dataflow(Apache Beam) using side output


I have this requirement where in I need to develop a Audit Mechanism. For Example there is a JSON file name emp.json:

{
    "magic": "atMSG",
    "type": "DT",
    "headers": null,
    "messageschemaid": null,
    "messageschema": null,
    "message": {
        "data": {
            "ID": "data_in_quotes",
            "NAME": "data_in_quotes",
            "SALARY": "data_in_quotes"
        },
        "beforeData": null,
        "headers": {
            "operation": "INSERT",
            "changeSequence": "20200822230048000000000017887787417",
            "timestamp": "2020-08-22T23:00:48.000",
            "transactionId": "some_id"
        }
    }
}

I need to insert the data into Bigquery table first: Staging.emp

ID, NAME, SALARY

1, ABC, 20000

2, XYZ, 30000

I also need to insert the ID, timestamp(Loading timestamp) and filename into a separate table: misc-dataset.Audit_table

ID, TIMESTAMP, FILENAME

1, 28-08-2020 22:55, emp.json

2, 28-08-2020 22:55, emp.json

As of now I am inserting into file to test it. if this is solved I will use

| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
                        "{0}:{1}.emp_data".format(projectId, datasetId),
                        schema=table_schema,
                        #write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                        )

I am unable to get that using side output, using the below code.

class InsertIntoBQAndAudit(beam.DoFn):
    
    def process(self, element):
        norm = json_normalize(json.loads(element), max_level=1)
        l = norm["message.data"]
        return l
    
    def process(self, element):
        norm1 = json_normalize(json.loads(element), max_level=1)
        l1 = norm1["message.data"]
        
        l1['ID']
        l1['TIMESTAMP':datetime.now()]
        l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
        return [beam.pvalue.TaggedOutput('Audit', l1)]

options = PipelineOptions()
p = beam.Pipeline(options=options)

data_from_source = (p
                    | "READ FROM JSON" >> ReadFromText("gs://ti-project-1/input/bill_document_prod_data_sample_records")
                     | "Send to Different Tags" >> beam.ParDo(InsertIntoBQAndAudit()).with_outputs('Audit', main='Load')
                   )

Audit_col = data_from_source.Audit
Load_col = data_from_source.Load

Load_col | "Write to Actual Table" >> WriteToText("gs://ti-project-1/output/data_output")
Audit_col | "Write to Audit Table" >> WriteToText("gs://ti-project-1/misc-temp/audit_out")

p.run()

Solution

  • A DoFn cannot have two process methods. Instead, create two separate DoFns and apply each to the same PCollection, e.g.

    class Audit(beam.DoFn):
      def process(self, l1)
        l1['ID']
        l1['TIMESTAMP':datetime.now()]
        # This line above seems to have gotten mangled in transit...
        l1 = [m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
        yield l1
    
    with beam.Pipeline(options=options) as p:
      data_from_source = p | "READ FROM JSON" >> ReadFromText("gs://...")
      parsed_data = data_from_source | beam.Map(
          lambda element: json_normalize(json.loads(element), max_level=1)["message.data"])
      
      Load_col = parsed_data
      Audit_col = parsed_data | beam.ParDo(Audit)
      ...
    

    You could use a multi-output DoFn for this, which would look like

    class ParseAndAudit(beam.DoFn):
        
        def process(self, element):
            norm = json_normalize(json.loads(element), max_level=1)
            l = norm["message.data"]
            yield l  # emit the main output
        
            # continue in the same process method
            l1 = l
            
            l1['ID']
            l1['TIMESTAMP':datetime.now()]
            l1.[m.metadata_list for m in gcs.match(['gs://ti-project-1/emp*'])]
            yield beam.pvalue.TaggedOutput('Audit', l1)
    

    and then use it as you ahve above, but that's probably more complicated to write and maintain.