Search code examples
file-iogoogle-cloud-dataflowapache-beam

How to set FileIO writeDynamic name with input fields?


I'm using Dataflow to load some csv to Google Cloud Storage and I need to save some CSV files into different directories based on data values (like uuid, region, etc.).

How can I do this? Currently I'm able to add the key (from KV) in the path but I would also need some other information that currently is only available on values.

Currently this saves data to gs://my-bucket/<uuid>/extraction.csv but I need something like gs://my-bucket/<uuid>/<region>/<store>/extraction.

Example csv:

uuid,region,store,....

123e4567-e89b-12d3-a456-426614174000,central,store1,foo,bar

.apply("Write CSV files",
                        FileIO.<String, KV<String, String>>writeDynamic()
                                .by(KV::getKey)
                                .to("gs://my-bucket")
                                .withDestinationCoder(StringUtf8Coder.of())
                                .withNumShards(1)
                                .via(Contextful.fn(KV::getValue), TextIO.sink())
                                .withNaming(key -> FileIO.Write.defaultNaming(String.format("%s/extraction",key),"csv"))
                );

Solution

  • You would need to have <region> and <store> as part of the key as well, and then generate the right path in the function that you pass with withNaming.