I have a Apache Beam/Dataflow pipeline that is writing results to a BigQuery table. I would then like to query this table for a separate portion of the pipeline. However, I can't seem to figure out how to properly set up this pipeline dependency. The new table that I write (and then want to query) is left joined with a separate table for some filtering logic and that is why I actually need to write the table and then run the query. The logic would be as follows:
with beam.Pipeline(options=pipeline_options) as p:
table_data = p | 'CreatTable' >> # ... logic to generate table ...
# Write Table to BQ
table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)
query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))
if query_new_table
is actually a query of an already existing BQ table and I change to query_results = p |
instead of table_written
this works properly. However, if I try to query the table that I am writing in the middle of the pipeline then I cannot get the pipeline step to "wait" until that table has actually been generated. Is there any way to do this that I am overlooking?
When I try to make this step sequential, I am getting an assertion error assert isinstance(pbegin, pvalue.PBegin) AssertionError
which I am reading to mean that table_written
is the issue as it is not a valid PCollection instance.
Does anybody know what I would could put in place of table_written to make this actually run sequentially as desired?
The use case "do something after a BigQuery write is complete" is not supported by Beam currently. The only workaround is to run separate pipelines: have your main program be: run the pipeline that writes to BigQuery; wait for the pipeline to finish; run another pipeline that reads from BigQuery.
This is a very frequently requested feature and we're beginning to design this support (more generally, upgrading various IO writes to support sequencing operations after them), but I don't know when we'll be done.