I have created three process and want to save data but this is not working for me.
Below is my custom pardo:
subscription = "pub/sub/writeToBigquery"
dataset_table1="A"
dataset_table2="B"
dataset_table3="c"
Schema1="et:timestamp,name:string,Day:integer"
schema2="et:timestamp,Place:string,Month:integer"
schema3="et:timestamp,Location:string,Year:integer"
class CustomParsing(beam.DoFn):
def process1(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.name
parsed["Day"] = parsed.day
yield parsed
def process2(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.place
parsed["Day"] = parsed.month
yield parsed
def process3(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.location
parsed["Day"] = parsed.year
yield parsed
How to write this in big query??
You can do that with 3 sinks, example :
def process1(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.name
parsed["Day"] = parsed.day
yield parsed
def process2(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.place
parsed["Day"] = parsed.month
yield parsed
def process3(self, element, timestamp=beam.DoFn.TimestampParam):
parsed = dict()
parsed = json.loads(element.decode("utf-8"))
parsed["et"] = parsed.et,
parsed["name"] = parsed.location
parsed["Day"] = parsed.year
yield parsed
def main() -> None:
logging.getLogger().setLevel(logging.INFO)
your_options = PipelineOptions().view_as(YourOptions)
pipeline_options = PipelineOptions()
with beam.Pipeline(options=pipeline_options) as p:
result_pcollection_pub_sub = p | 'Read from pub sub' >> ReadFromPubSub(subscription='input_subscription')
(result_pcollection_pub_sub |
'Map BQ table 1' >> beam.Map(process1) |
'Write to BQ table 1' >> beam.io.WriteToBigQuery(
project='project_id',
dataset='dataset',
table='table1',
method='STREAMING_INSERTS',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
(result_pcollection_pub_sub |
'Map BQ table 2' >> beam.Map(process2) |
'Write to BQ table 2' >> beam.io.WriteToBigQuery(
project='project_id',
dataset='dataset',
table='table2',
method='STREAMING_INSERTS',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
(result_pcollection_pub_sub |
'Map BQ table 3' >> beam.Map(process3) |
'Write to BQ table 3' >> beam.io.WriteToBigQuery(
project='project_id',
dataset='dataset',
table='table3',
method='STREAMING_INSERTS',
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER))
if __name__ == "__main__":
main()
PCollection
is the result of input from PubSub
.PCollection
from PubSub
Bigquery
tableFlow 1 => Map to BQ table 1 => Sink result to BQ table 1 with `BigqueryIO`
Flow 2 => Map to BQ table 2 => Sink result to BQ table 2 with `BigqueryIO`
Flow 3 => Map to BQ table 3 => Sink result to BQ table 3 with `BigqueryIO`
In this example I used STREAMING_INSERT
for ingestion to Bigquery
tables, but you can adapt and change it if needed in your case.