Search code examples
pythongoogle-bigquerypipelineapache-beamgoogle-cloud-pubsub

How do i save one subscription pub/sub message to three different big query tables in apache beam in python?


I have created three process and want to save data but this is not working for me.

Below is my custom pardo:


subscription = "pub/sub/writeToBigquery"
dataset_table1="A"
dataset_table2="B"
dataset_table3="c"
Schema1="et:timestamp,name:string,Day:integer"
schema2="et:timestamp,Place:string,Month:integer"
schema3="et:timestamp,Location:string,Year:integer"

    class CustomParsing(beam.DoFn):

        def process1(self, element, timestamp=beam.DoFn.TimestampParam):
            parsed = dict()
            parsed = json.loads(element.decode("utf-8"))
            parsed["et"] = parsed.et,
            parsed["name"] = parsed.name
            parsed["Day"] = parsed.day
            yield parsed

       def process2(self, element, timestamp=beam.DoFn.TimestampParam):
            parsed = dict()
            parsed = json.loads(element.decode("utf-8"))
            parsed["et"] = parsed.et,
            parsed["name"] = parsed.place
            parsed["Day"] = parsed.month
            yield parsed

      def process3(self, element, timestamp=beam.DoFn.TimestampParam):
            parsed = dict()
            parsed = json.loads(element.decode("utf-8"))
            parsed["et"] = parsed.et,
            parsed["name"] = parsed.location
            parsed["Day"] = parsed.year
            yield parsed

How to write this in big query??


Solution

  • You can do that with 3 sinks, example :

    def process1(self, element, timestamp=beam.DoFn.TimestampParam):
        parsed = dict()
        parsed = json.loads(element.decode("utf-8"))
        parsed["et"] = parsed.et,
        parsed["name"] = parsed.name
        parsed["Day"] = parsed.day
        yield parsed
    
    def process2(self, element, timestamp=beam.DoFn.TimestampParam):
        parsed = dict()
        parsed = json.loads(element.decode("utf-8"))
        parsed["et"] = parsed.et,
        parsed["name"] = parsed.place
        parsed["Day"] = parsed.month
        yield parsed
    
    def process3(self, element, timestamp=beam.DoFn.TimestampParam):
        parsed = dict()
        parsed = json.loads(element.decode("utf-8"))
        parsed["et"] = parsed.et,
        parsed["name"] = parsed.location
        parsed["Day"] = parsed.year
        yield parsed
    
    def main() -> None:
        logging.getLogger().setLevel(logging.INFO)
    
        your_options = PipelineOptions().view_as(YourOptions)
        pipeline_options = PipelineOptions()
    
        with beam.Pipeline(options=pipeline_options) as p:
    
            result_pcollection_pub_sub = p | 'Read from pub sub' >> ReadFromPubSub(subscription='input_subscription')
    
            (result_pcollection_pub_sub |
             'Map BQ table 1' >> beam.Map(process1) |
             'Write to BQ table 1' >> beam.io.WriteToBigQuery(
                        project='project_id',
                        dataset='dataset',
                        table='table1',
                        method='STREAMING_INSERTS',
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
    
            (result_pcollection_pub_sub |
             'Map BQ table 2' >> beam.Map(process2) |
             'Write to BQ table 2' >> beam.io.WriteToBigQuery(
                        project='project_id',
                        dataset='dataset',
                        table='table2',
                        method='STREAMING_INSERTS',
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
    
            (result_pcollection_pub_sub |
             'Map BQ table 3' >> beam.Map(process3) |
             'Write to BQ table 3' >> beam.io.WriteToBigQuery(
                        project='project_id',
                        dataset='dataset',
                        table='table3',
                        method='STREAMING_INSERTS',
                        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                        create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
    
    
    if __name__ == "__main__":
        main()
    
    • The first PCollection is the result of input from PubSub.
    • Then the goal is to apply 3 separated transformations from this inputs PCollection from PubSub
    • Each flow applies one of your transformation and sink the result to the destination Bigquery table
    Flow 1 => Map to BQ table 1 => Sink result to BQ table 1 with `BigqueryIO`
    Flow 2 => Map to BQ table 2 => Sink result to BQ table 2 with `BigqueryIO`
    Flow 3 => Map to BQ table 3 => Sink result to BQ table 3 with `BigqueryIO`
    

    In this example I used STREAMING_INSERT for ingestion to Bigquery tables, but you can adapt and change it if needed in your case.