I am trying to create a Dataflow job run daily with Cloud Scheduler. I need to get the data from an external API using GET requests, so I need the current date as an input. However, when I export the dataflow job as a template for scheduling, the date input stays at execution time and not updated daily. I have been looking around for a solution, and come across the ValueProvider, but my pipeline, stating with apache_beam.transforms.Create
always return an error 'RuntimeValueProvider(option: test, type: str, default_value: 'killme').get() not called from a runtime context' when the ValueProvider is not specified.
Is there anyway I can overcome this? It seems like such a simple problem, yet I cannot make it work no matter how. I appreciate a lot if there is any idea!!
You can use the ValueProvider interface to pass runtime parameters to your pipeline, to access it within a DoFn you will need to pass it in as parameter. Similar to the following example from here:
class LogValueProvidersFn(beam.DoFn):
def __init__(self, string_vp):
self.string_vp = string_vp
# Define the DoFn that logs the ValueProvider value.
# The DoFn is called when creating the pipeline branch.
# This example logs the ValueProvider value, but
# you could store it by pushing it to an external database.
def process(self, an_int):
logging.info('The string_value is %s' % self.string_vp.get())
# Another option (where you don't need to pass the value at all) is:
logging.info(
'The string value is %s' %
RuntimeValueProvider.get_value('string_value', str, ''))
| beam.Create([None])
| 'LogValueProvs' >> beam.ParDo(
LogValueProvidersFn(my_options.string_value)))
You may also want to have a look at Flex templates :
https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates