Search code examples
apache-flinkflink-streaming

How to understand Window mechanism in Apache Flink


I'm learning how to use Flink to process streaming data.

As my understanding, I can use the function map to do all kinds of transformation many times.

Saying that the Data Source kept sending Strings to Flink. All of Strings are the JSON-format data as below:

{"name":"titi","age":18}
{"name":"toto","age":20}
...

Here is my code:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkPravegaReader<String> source = FlinkPravegaReader.<String>builder()
    .withPravegaConfig(pravegaConfig)
    .forStream(stream)
    .withDeserializationSchema(new PravegaDeserializationSchema<>(String.class, new JavaSerializer<>()))
    .build();

// Convert String to Json Object
// MyJson is a POJO class, defined by me
DataStream<MyJson> jsonStream = env.addSource(source).name("Pravega Stream")
    .map(new MapFunction<String, MyJson>() {
    @Override
    public MyJson map(String s) throws Exception {
        MyJson myJson = JSON.parseObject(s, MyJson.class);
        return myJson;
        }
    });
// Convert MyJson Object to String and extract what I need
DataStream<String> valueInJson = jsonStream
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.getName().toString();
        }
    });
valueInJson.print();
env.execute("StreamingJob");

As you see, my example is quite simple: get and deserialize data ---> transform string to Json object ---> transform Json object to string and get what I need (I just need name here).

For now, it seems that everyting works fine. I did get the expected output from the log file.

However, I know that Flink provides us a powerful function: Window.

I want to know how to use this mechanism into my example.

For example, if I want to split the data stream with some 2-seconds windows, how to code this?

I've tried like this:

DataStream<String> valueInJson = jsonStream
    .timeWindow(Time.seconds(2))
    .map(new MapFunction<MyJson, String>() {
        @Override
        public String map(MyJson myJson) throws Exception {
            return myJson.toString();
        }
    });
valueInJson.print();

However, I got an error:

cannot find symbol
symbol: method
timeWindow(org.apache.flink.streaming.api.windowing.time.Time)
location: variable jsonStream of type org.apache.flink.streaming.api.datastream.DataStream

But, I have imported:

import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.time.Time;

Why did I get this error? Did I use the Windows wrongly? Did I miss understand something about Flink?


Solution

  • You have the error because the timeWindow() function is defined in the KeyedStream not in the DataStream as it is key-based operation. In your case it should be enough to change timeWindow() into timeWindowAll().