It seems that I get an issue writing tagged PCollections to multiple destination tables in BQ. The pipeline executes with no errors, but no data gets written.
If I execute the pipeline without TaggedOutput
, PCollection elements are correctly generated and correctly written to the BQ table on its own (albeit a single table, instead of multiple). So I believe the issue is misunderstanding how TaggedOutput
actually works?
Code
I have a process fn which generated tagged output:
class ProcessFn(beam.DoFn):
def process(self, el):
if el > 5:
yield TaggedOutput('more_than_5', el)
else:
yield TaggedOutput('less_than_5', el)
And the pipeline:
with beam.Pipeline(options=beam_options) as p:
# Read the table rows into a PCollection.
results = (
p
| "read" >> beam.io.ReadFromBigQuery(table=args.input_table, use_standard_sql=True)
| "process rows" >> beam.ParDo(ProcessFn()).with_outputs(
'more_than_5',
main='less_than_5')
)
results.less_than_5 | "write to bq 1" >> beam.io.WriteToBigQuery(
'dataset.less_than_5',
schema=less_than_5_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
results.more_than_5 | "write to bq 2" >> beam.io.WriteToBigQuery(
'dataset.more_than_5',
schema=more_than_5_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
)
The with_outputs(main=...)
keyword is used for yields without the TaggedOutput
. In this case, you should probably be writing with_outputs('more_than_5', 'less_than_5')
. Either accessing the result by name or unpacking as a tuple should work.