I'm encountering an issue while attempting to modify the arguments passed to a RuntimeValueProvider
in an Apache Beam Dataflow pipeline. Here's a simplified version of my code:
import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
PipelineOptions)
class DataflowFlags(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
"--output_path",
dest="output_path",
type=str,
default="./python_extract_output",
)
parser.add_argument(
"--project_id", dest="project_id", default="test"
)
class ExtractPipelineRunner:
def __init__(
self,
output_path: str,
):
self.output_path = output_path
def run(self, p: beam.Pipeline) -> None:
_ = (
p
| "Create" >> beam.Create(["hello", "world"])
| "WriteToText" >> WriteToText(self.output_path.get() + "test/")
)
def main() -> None:
pipeline_options = PipelineOptions()
known_args = pipeline_options.view_as(DataflowFlags)
pipeline_options.view_as(GoogleCloudOptions).project = known_args.project_id
with beam.Pipeline(options=pipeline_options) as p:
extract_runner = ExtractPipelineRunner(known_args.output_path)
extract_runner.run(p)
if __name__ == "__main__":
main()
I'm trying to modify the output_path by appending an extra value to it before passing it to WriteToText
, but I encounter the following error:
apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: output_path, type: str, default_value: './python_extract_output').get() not called from a runtime context
How can I solve this issue? I want to be able to modify the RuntimeValueProvider
arguments appropriately. Thank you for your help!
I want to dynamically trigger a Dataflow template every month and append the current month(i.e. in place of "test/") as a string to the output path. I've attempted to achieve this by using a StaticValueProvider, but it's not updating every month.
Have you looked into using the NestedValueProvider
? (documentation, Python implementation).
This should allow you to take the original ValueProvider
output_path
and modify it to include the month using a translator function.