Search code examples
pythongoogle-cloud-platformpipelineapache-beamdataflow

Dynamic paths with apache beam and Runtime parameters


I am creating a pipeline TEMPLATE which takes some input file and counts the words on it. All works fine until this point, but the thing is that I need to pass another parameter (from the function where I call the template) that lets me pass the name of the file so I can create a path with it.

I'll show you an example of what I want though I know pipelines can't access Runtime parameters during pipeline construction or outside a runtime context this can help to give you an Idea of what I need to do:

class tempatableTest(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
    parser.add_value_provider_argument(
        '--input',
        type=str,
        help='path to the input file'
    )
    parser.add_value_provider_argument(
        '--fdinamic',
        type=str,
        help='folder name'
    )

templatable_test = PipelineOptions().view_as(tempatableTest)
beam_options= PipelineOptions()
input = templatable_test.input
dinamicName = templatable_test.fdinamic.get()

with beam.Pipeline(options=beam_options) as p:
    lines = p | beam.io.ReadFromText(input)
    len = lines | beam.combiners.Count.Globally()
    len | 'countTotalLen' >> beam.io.WriteToText(f'gs://bucket-test-out/processedFile/{dinamicName}/count.txt')

If I use templatable_test.fdinamic.get() I'd get the runtime error but if I remove the .get() I'd get a super long name on the folder.

I know probably this isn't the way to go but is just to illustrate what I need to do, thank you for your help.


Solution

  • Unfortunately the WriteToText transform can't be used for this because it currently only supports a fixed destination. So in order to write files to dynamic destinations, you would instead need to use utilities from the fileio module which supports dynamic destinations. Although this does mean switching to using the experimental WriteToFiles transform.