Search code examples
airflowdataflowgoogle-cloud-composer

How to pass parameters from google composer to dataflow template


I'm trying to pass parameter from google composer into a dataflow template as following way, but it does not work.

# composer code
trigger_dataflow = DataflowTemplateOperator(
     task_id="trigger_dataflow",
     template="gs://mybucket/my_template",
     dag=dag,
     job_name='appsflyer_events_daily',
     parameters={
         "input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
     }
)

# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input',
        default='gs://my_bucket/*.gz',
        help='path of input file')

def main():
    pipeline_options = PipelineOptions()
    user_options = pipeline_options.view_as(UserOptions) 
    p = beam.Pipeline(options=pipeline_options)
    lines = (
        p
        | MatchFiles(user_options.input)
    )

Solution

  • Moving from dataflow classic template to flex template fixed the issue.