Search code examples
streamingapache-flinkflink-streaming

How to generate output files for each input in Apache Flink


I'm using Flink to process my streaming data.

The streaming is coming from some other middleware, such as Kafka, Pravega, etc.

Saying that Pravega is sending some word stream, hello world my name is....

What I need is three steps of process:

  1. Map each word to my custom class object MyJson.
  2. Map the object MyJson to String.
  3. Write Strings to files: one String is written to one file.

For example, for the stream hello world my name is, I should get five files.

Here is my code:

// init Pravega connector
PravegaDeserializationSchema<String> adapter = new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>());
        FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
                .withPravegaConfig(pravegaConfig)
                .forStream(stream)
                .withDeserializationSchema(adapter)
                .build();
// map stream to MyJson
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
            .map(new MapFunction<String, MyJson>() {
                @Override
                public MyJson map(String s) throws Exception {
                    MyJson myJson = JSON.parseObject(s, MyJson.class);
                    return myJson;
                }
            });
// map MyJson to String
DataStream<String> valueInJson = jsonStream
            .map(new MapFunction<MyJson, String>() {
                @Override
                public String map(MyJson myJson) throws Exception {
                    return myJson.toString();
                }
            });
// output
valueInJson.print();

This code will output all of results to Flink log files.

My question is how to write one word to one output file?


Solution

  • I think the easiest way to do this would be with a custom sink.

    stream.addSink(new WordFileSink)
    
    public static class WordFileSink implements SinkFunction<String> {
    
        @Override
        public void invoke(String value, Context context) {
            // generate a unique name for the new file and open it
            // write the word to the file
            // close the file
        }
    }
    

    Note that this implementation won't necessarily provide exactly once behavior. You might want to take care that the file naming scheme is both unique and deterministic (rather than depending on processing time), and be prepared for the case that the file may already exist.