Search code examples
apache-flinkflink-streaming

Apache Flink: How to process three streams


I want to receive and process three streams in one operator.For example, the code implemented in Storm is as follows:

builder.setBolt("C_bolt", C_bolt(), parallelism_hint) .fieldsGrouping("A_bolt", "TRAINING", new Fields("word")) .fieldsGrouping("B_bolt", "ANALYSIS", new Fields("word")) .allGrouping("A_bolt", "SUM");

In Flink, the processing of SUM stream(A_bolt's SideOutput) and TRAINING stream(A_bolt) is implemented:

SingleOutputStreamOperator<Tuple3<String, Integer, Boolean>> A_bolt;
DataStream<Tuple2<Integer, Integer>> Sum = A_bolt.getSideOutput(outputTag).broadcast();
DataStream<Tuple3<String, String, Integer>> B_bolt;
DataStream<String> C_bolt= A_bolt
                        .keyBy(new KeySelector<Tuple3<String,Integer,Boolean>, String>() {
                                    @Override
                                    public String getKey(Tuple3<String,Integer,Boolean> in) throws Exception {
                                        return in.f0;
                                    }
                                })
                        .connect(Sum)
                        .flatMap(new Process())
                        .setParallelism(parallelism);

But I don't know how to add ANALYSIS stream(B_bolt). Thank you for your help.


Solution

  • Flink only supports one-input and two-input stream operators. Your options are to:

    1. Use union() to create a merged stream containing all the elements from all three streams (which would have to all be of the same type, though you could use Either to assist with this).
    2. After using a coFlatMap to combine two of the streams, connect that preliminary result to the third stream, using another coFlatMap (or a coProcessFunction) to complete the processing.

    Or perhaps a combination of these two techniques is preferable in your case.