I'm trying to pass parameter from google composer into a dataflow template as following way, but it does not work.
# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters={
"input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
}
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)
Moving from dataflow classic template to flex template fixed the issue.