Search code examples
templatesapache-beamdataflowapache-beam-iovalue-provider

How to create a beam template with current date as an input (updated daily) [Create from GET request]


I am trying to create a Dataflow job run daily with Cloud Scheduler. I need to get the data from an external API using GET requests, so I need the current date as an input. However, when I export the dataflow job as a template for scheduling, the date input stays at execution time and not updated daily. I have been looking around for a solution, and come across the ValueProvider, but my pipeline, stating with apache_beam.transforms.Create always return an error 'RuntimeValueProvider(option: test, type: str, default_value: 'killme').get() not called from a runtime context' when the ValueProvider is not specified.

Is there anyway I can overcome this? It seems like such a simple problem, yet I cannot make it work no matter how. I appreciate a lot if there is any idea!!


Solution

  • You can use the ValueProvider interface to pass runtime parameters to your pipeline, to access it within a DoFn you will need to pass it in as parameter. Similar to the following example from here:

    https://beam.apache.org/documentation/patterns/pipeline-options/#retroactively-logging-runtime-parameters

    class LogValueProvidersFn(beam.DoFn):
      def __init__(self, string_vp):
        self.string_vp = string_vp
    
      # Define the DoFn that logs the ValueProvider value.
      # The DoFn is called when creating the pipeline branch.
      # This example logs the ValueProvider value, but
      # you could store it by pushing it to an external database.
      def process(self, an_int):
        logging.info('The string_value is %s' % self.string_vp.get())
        # Another option (where you don't need to pass the value at all) is:
        logging.info(
            'The string value is %s' %
            RuntimeValueProvider.get_value('string_value', str, ''))
    
      | beam.Create([None])
      | 'LogValueProvs' >> beam.ParDo(
          LogValueProvidersFn(my_options.string_value)))
    

    You may also want to have a look at Flex templates :

    https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates