Search code examples
apache-flinkflink-streaming

How to filter Apache flink stream on the basis of other?


I have two stream one is of Int and other is of json .In The json Schema there is one key which is some int .So i need to filter the json stream via key comparison with the other integer stream so Is it possible in Flink?


Solution

  • Yes, you can do this kind of stream processing with Flink. The basic building blocks you need from Flink are connected streams, and stateful functions -- here's an example using a RichCoFlatMap:

    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
    import org.apache.flink.util.Collector;
    
    public class Connect {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStream<Event> control = env.fromElements(
                    new Event(17),
                    new Event(42))
                    .keyBy("key");
    
            DataStream<Event> data = env.fromElements(
                    new Event(2),
                    new Event(42),
                    new Event(6),
                    new Event(17),
                    new Event(8),
                    new Event(42)
                    )
                    .keyBy("key");
    
            DataStream<Event> result = control
                    .connect(data)
                    .flatMap(new MyConnectedStreams());
    
            result.print();
    
            env.execute();
        }
    
        static final class MyConnectedStreams
                extends RichCoFlatMapFunction<Event, Event, Event> {
    
            private ValueState<Boolean> seen = null;
    
            @Override
            public void open(Configuration config) {
                ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>(
                        // state name
                        "have-seen-key",
                        // type information of state
                        TypeInformation.of(new TypeHint<Boolean>() {
                        }));
                seen = getRuntimeContext().getState(descriptor);
            }
    
            @Override
            public void flatMap1(Event control, Collector<Event> out) throws Exception {
                seen.update(Boolean.TRUE);
            }
    
            @Override
            public void flatMap2(Event data, Collector<Event> out) throws Exception {
                if (seen.value() == Boolean.TRUE) {
                    out.collect(data);
                }
            }
        }
    
    
        public static final class Event {
            public Event() {
            }
    
            public Event(int key) {
                this.key = key;
            }
    
            public int key;
    
            public String toString() {
                return String.valueOf(key);
            }
        }
    }
    

    In this example, only those keys that have been seen on the control stream are passed through the data stream -- all other events are filtered out. I've taken advantage of Flink's managed keyed state and connected streams.

    To keep this simple I've ignored your requirement that the data stream has JSON, but you can find examples of how to work with JSON and Flink elsewhere.

    Note that your results will be non-deterministic, since you have no control over the timing of the two streams relative to one another. You could manage this by adding event-time timestamps to the streams, and then using a RichCoProcessFunction instead.