I am trying to write all the elements of a Pub/Sub message (data,attributes,messageId and publish_time) to BigQuery using Apache Beam and wanted the data to be as:
data | attr | key | publishTime |
---|---|---|---|
data | attr | key | publishTime |
I am currently using the following piece of code to transform the message and wanted to save it in the table shown above:
( demo
| "Decoding Pub/Sub Message" + input_subscription >> beam.Map(lambda r : data_decoder.decode_base64(r))
| "Parsing Pub/Sub Message" + input_subscription >> beam.Map(lambda r : data_parser.parseJsonMessage(r))
| "Write to BigQuery Table" + input_subscription >> io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
schema=schema, write_disposition=io.BigQueryDisposition.WRITE_APPEND, create_disposition = io.BigQueryDisposition.CREATE_IF_NEEDED ))
I wanted to store data in encoded way, column named as data and value as (element.data) and the values for rest of the columns.
Thanks in advance!
I hope it can help.
I give an example based on your use case, I mocked a PubSubMessage
with Beam
in an unit test :
def test_beam_pubsub_to_bq(self):
with TestPipeline() as p:
message = PubsubMessage(
data=b'{"test" : "value"}',
attributes={'label': 'label'},
message_id='message33444',
publish_time=datetime.datetime.now()
)
result = (p
| beam.Create([message])
| 'Map' >> beam.Map(self.to_bq_element))
result | "Print outputs" >> beam.Map(log_element)
def to_bq_element(self, message: PubsubMessage):
return {
'data': message.data,
'attr': json.dumps(message.attributes),
'key': message.message_id,
'publishTime': message.publish_time.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}
PubSubMessage
PubSubMessage
to a Dict
in order to write the element to Bigquery
I have the following result :
{
'data': b'{"test" : "value"}',
'attr': '{"label": "label"}',
'key': 'message33444',
'publishTime': '2022-10-07 09:37:33'
}
For data
I used a Python
bytes
, I think the good related type in Bigquery
is BYTES
: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
For att
, I used a Json String
, I initially had a Dict
key
is a String
publishTime
is a String
with timestamp as ISO
format
Don't hesitate to adapt this example to fit perfectly on your need.
The result Dict
element for Bigquery
in the Beam
transformation must match exactly the schema of the Bigquery
table.
The example with the BigqueryIO
write
:
def test_beam_pubsub_to_bq(self):
with TestPipeline() as p:
message = PubsubMessage(
data=b'{"test" : "value"}',
attributes={'label': 'label'},
message_id='message33444',
publish_time=datetime.datetime.now()
)
(p
| beam.Create([message])
| 'Map' >> beam.Map(self.to_bq_element)
| "Print outputs" >> beam.Map(log_element)
| "Write to BigQuery Table" + input_subscription >> io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
schema=schema, write_disposition=io.BigQueryDisposition.WRITE_APPEND, create_disposition = io.BigQueryDisposition.CREATE_IF_NEEDED ))
def to_bq_element(self, message: PubsubMessage):
return {
'data': message.data,
'attr': json.dumps(message.attributes),
'key': message.message_id,
'publishTime': message.publish_time.utcnow().strftime("%Y-%m-%d %H:%M:%S")
}