Search code examples
pythongoogle-cloud-dataflowapache-beam

How to modify dataflow runtimevalueprovider args while passing to pardo?


I'm encountering an issue while attempting to modify the arguments passed to a RuntimeValueProvider in an Apache Beam Dataflow pipeline. Here's a simplified version of my code:

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
                                                  PipelineOptions)


class DataflowFlags(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        "--output_path",
        dest="output_path",
        type=str,
        default="./python_extract_output",
    )
    parser.add_argument(
        "--project_id", dest="project_id", default="test"
    )


class ExtractPipelineRunner:

  def __init__(
      self,
      output_path: str,
  ):
    self.output_path = output_path

  def run(self, p: beam.Pipeline) -> None:
    _ = (
        p
        | "Create" >> beam.Create(["hello", "world"])
        | "WriteToText" >> WriteToText(self.output_path.get() + "test/")
    )


def main() -> None:
  pipeline_options = PipelineOptions()
  known_args = pipeline_options.view_as(DataflowFlags)
  pipeline_options.view_as(GoogleCloudOptions).project = known_args.project_id

  with beam.Pipeline(options=pipeline_options) as p:
    extract_runner = ExtractPipelineRunner(known_args.output_path)
    extract_runner.run(p)


if __name__ == "__main__":
  main()

I'm trying to modify the output_path by appending an extra value to it before passing it to WriteToText, but I encounter the following error:

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: output_path, type: str, default_value: './python_extract_output').get() not called from a runtime context

How can I solve this issue? I want to be able to modify the RuntimeValueProvider arguments appropriately. Thank you for your help!

I want to dynamically trigger a Dataflow template every month and append the current month(i.e. in place of "test/") as a string to the output path. I've attempted to achieve this by using a StaticValueProvider, but it's not updating every month.


Solution

  • Have you looked into using the NestedValueProvider ? (documentation, Python implementation).

    This should allow you to take the original ValueProvider output_path and modify it to include the month using a translator function.