Search code examples
pythongoogle-cloud-dataflowapache-beam

Apache Beam Pipeline Query Table After Writing Table


I have a Apache Beam/Dataflow pipeline that is writing results to a BigQuery table. I would then like to query this table for a separate portion of the pipeline. However, I can't seem to figure out how to properly set up this pipeline dependency. The new table that I write (and then want to query) is left joined with a separate table for some filtering logic and that is why I actually need to write the table and then run the query. The logic would be as follows:

with beam.Pipeline(options=pipeline_options) as p:
    table_data = p | 'CreatTable' >> # ... logic to generate table ...

    # Write Table to BQ
    table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...)

    query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))

if query_new_table is actually a query of an already existing BQ table and I change to query_results = p | instead of table_written this works properly. However, if I try to query the table that I am writing in the middle of the pipeline then I cannot get the pipeline step to "wait" until that table has actually been generated. Is there any way to do this that I am overlooking?

When I try to make this step sequential, I am getting an assertion error assert isinstance(pbegin, pvalue.PBegin) AssertionError which I am reading to mean that table_written is the issue as it is not a valid PCollection instance.

Does anybody know what I would could put in place of table_written to make this actually run sequentially as desired?


Solution

  • The use case "do something after a BigQuery write is complete" is not supported by Beam currently. The only workaround is to run separate pipelines: have your main program be: run the pipeline that writes to BigQuery; wait for the pipeline to finish; run another pipeline that reads from BigQuery.

    This is a very frequently requested feature and we're beginning to design this support (more generally, upgrading various IO writes to support sequencing operations after them), but I don't know when we'll be done.