Search code examples
javaapache-flinkflink-streaming

Flink: How to pass a dynamic path while writing to files using writeFileAsText(path)?


Let's say I have a Stream with elements of type String. I want to write each element in the stream to a separate file in some folder. I'm using the following set up.

stream.writeAsText(path).setParallelism(1);

How do I make this path dynamic? I even tried adding System.nanotime() to the path to make it dynamic. But it still doesn't seem to work, everything gets written to a single file.


Solution

  • Your problem is that DataStream.writeAsText() writes the entire content of the stream to the file at once, so you will only ever get a single file.

    It looks like this will return a collection that you can use to output your strings as different files.

    dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
    });
    

    Taken straight from the documentation here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/datastream_api.html