Search code examples

How to set FileIO writeDynamic name with input fields?

I'm using Dataflow to load some csv to Google Cloud Storage and I need to save some CSV files into different directories based on data values (like uuid, region, etc.).

How can I do this? Currently I'm able to add the key (from KV) in the path but I would also need some other information that currently is only available on values.

Currently this saves data to gs://my-bucket/<uuid>/extraction.csv but I need something like gs://my-bucket/<uuid>/<region>/<store>/extraction.

Example csv:



.apply("Write CSV files",
                        FileIO.<String, KV<String, String>>writeDynamic()
                                .via(Contextful.fn(KV::getValue), TextIO.sink())
                                .withNaming(key -> FileIO.Write.defaultNaming(String.format("%s/extraction",key),"csv"))


  • You would need to have <region> and <store> as part of the key as well, and then generate the right path in the function that you pass with withNaming.