Search code examples
google-cloud-platformgoogle-bigquerygoogle-cloud-dataflowapache-beamgoogle-cloud-pubsub

How to Process my PubSub Message Object and Write all objects into BigQuery in Apache Beam using python?


I am trying to write all the elements of a Pub/Sub message (data,attributes,messageId and publish_time) to BigQuery using Apache Beam and wanted the data to be as:

data attr key publishTime
data attr key publishTime

I am currently using the following piece of code to transform the message and wanted to save it in the table shown above:

( demo  
      | "Decoding Pub/Sub Message" + input_subscription >> beam.Map(lambda r : data_decoder.decode_base64(r))
      | "Parsing Pub/Sub Message" + input_subscription >> beam.Map(lambda r : data_parser.parseJsonMessage(r))
      | "Write to BigQuery Table"  + input_subscription  >> io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
                                                                        schema=schema, write_disposition=io.BigQueryDisposition.WRITE_APPEND, create_disposition = io.BigQueryDisposition.CREATE_IF_NEEDED ))

I wanted to store data in encoded way, column named as data and value as (element.data) and the values for rest of the columns.

Thanks in advance!


Solution

  • I hope it can help.

    I give an example based on your use case, I mocked a PubSubMessage with Beam in an unit test :

    def test_beam_pubsub_to_bq(self):
            with TestPipeline() as p:
                message = PubsubMessage(
                    data=b'{"test" : "value"}',
                    attributes={'label': 'label'},
                    message_id='message33444',
                    publish_time=datetime.datetime.now()
                )
    
                result = (p
                          | beam.Create([message])
                          | 'Map' >> beam.Map(self.to_bq_element))
                result | "Print outputs" >> beam.Map(log_element)
    
        def to_bq_element(self, message: PubsubMessage):
            return {
                'data': message.data,
                'attr': json.dumps(message.attributes),
                'key': message.message_id,
                'publishTime': message.publish_time.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            }
    
    • I read a PubSubMessage
    • Map the PubSubMessage to a Dict in order to write the element to Bigquery

    I have the following result :

    {
       'data': b'{"test" : "value"}', 
       'attr': '{"label": "label"}', 
       'key': 'message33444', 
       'publishTime': '2022-10-07 09:37:33'
    }
    

    Don't hesitate to adapt this example to fit perfectly on your need.

    The result Dict element for Bigquery in the Beam transformation must match exactly the schema of the Bigquery table.

    The example with the BigqueryIO write :

    def test_beam_pubsub_to_bq(self):
            with TestPipeline() as p:
                message = PubsubMessage(
                    data=b'{"test" : "value"}',
                    attributes={'label': 'label'},
                    message_id='message33444',
                    publish_time=datetime.datetime.now()
                )
    
                (p
                   | beam.Create([message])
                   | 'Map' >> beam.Map(self.to_bq_element)
                   | "Print outputs" >> beam.Map(log_element)
                   | "Write to BigQuery Table"  + input_subscription  >> io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
                                                                            schema=schema, write_disposition=io.BigQueryDisposition.WRITE_APPEND, create_disposition = io.BigQueryDisposition.CREATE_IF_NEEDED ))
    
        def to_bq_element(self, message: PubsubMessage):
            return {
                'data': message.data,
                'attr': json.dumps(message.attributes),
                'key': message.message_id,
                'publishTime': message.publish_time.utcnow().strftime("%Y-%m-%d %H:%M:%S")
            }