Search code examples
pythongoogle-bigquerygoogle-cloud-dataflowvalue-provider

use ValueProvider to format a BigQuery in Dataflow


I am currently working with Dataflow to do a recurrent batch processing in python.

Basically I read data from bigquery and do stuff on it.. My pipeline looks like this

pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
             | "doing stuff" >> beam.Map(do_some_stuff)
             )

I want to run the jobs using a Dataflow Template to adapt it to the runtime.

Thanks to the documentation https://cloud.google.com/dataflow/docs/guides/templates/creating-templates , the Using ValueProvider in your functions part, I managed to give "do_some_stuff" an extra argument from runtime using a ParDo.


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

But I also want to change the number of user concerned by the process and so I want to adapt the query to the runtime.


class TemplateOption(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--template_nb_users',
                                           default=100,
                                           type=int)
        parser.add_value_provider_argument('--template_do_stuff_param',
                                           default=45,
                                           type=int)
class MyDoStuffFn(beam.DoFn):
    def __init__(self, template_do_stuff_param):
      self.template_do_stuff_param = template_do_stuff_param

    def process(self, *_):
      yield do_some_stuff(self.template_do_stuff_param.get())


pipeline_options = PipelineOptions()
p = beam.Pipeline(options=pipeline_options)
template_option = pipeline_options.view_as(TemplateOption)

lines = (p
             | 'read_big_query' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY.format(nb_users=template_option.template_nb_users.get()),
                                                                                     use_standard_sql=True))
             | "doing stuff" >> beam.ParDo(MyDoStuffFn(template_option.template_do_stuff_param))
             )

... This does not work because I call get() before the pipeline execution. So far I did not manage to adapt what I did for the do_some_stuff function to the "Read" line

Any advice or solution on how to proceed would be most appreciated. Thanks!


Solution

  • Unfortunately, the BigQuerySource does not support value providers. This is because it is implemented natively in the Dataflow runner, and thus all the information needs to be available at pipeline construction time.

    You can try out the transform apache_beam.io.gcp.bigquery.ReadFromBigQuery - this will allow you to use value providers.