Search code examples
apache-flinkflink-streamingflink-cep

Joining more than 2 streams in Flink & applying CEP to more than 2 streams without joining them


Question # 1 : I am working on a case scenario where we need to fuse data from multiple sensors [8 sensors for instance] and join them in a tree form. For example, joining [s1,s2,s3 s4] to form stream A and then [s5,s6,s7 and s8] to form Stream B and then perform CEP on stream A and B. How can I achieve this?

Question # 2 : Is it possible to perform CEP on multiple streams, means more than one stream ?. It is clearly mentioned in flink 1.3.2 API that pattern will be applied to one stream

DataStream<Event> input = ...
Pattern<Event, ?> pattern = ...

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

If pattern can not be applied on more than one stream, then how will Flink CEP work with the classic example of CEP which involves smoke stream and temperature stream to create an alert when there is fire.

  • does joining both smoke and temp stream based on some key like the timestamp, is the only solution?

  • Then how can Flink be applied to broad use cases of IOT which involves multiple sensors?


Solution

  • This depends on how you are reading data from sensors, if the data arrives on different kafka topics, you can create 2 flink jobs .

    Job 1 - Reads from sensor topics s1,s2,s3,s4 and creates a stream A and then another kafkaconsumer reads from s5,s6,s7,s8 and creates stream B. Then you push the data from these 2 streams on 2 intermediate topics -> streamA, streamB

    Job 2 - Now job 2 reads from kafka topics streamA and streamB together and creates a single datastream.

    Keep in mind that when you do this, you should rely on event time of your sensor data and not on ingestion time or processing time, for getting accurate results.

    After joining 2 streams, running CEP is not really difficult as you can see from this previous question - Process multiple streams in Flink CEP