When I am using Flink CEP code for processing time (which is by default config) I am able to get the required patter match but while configuring the env to Event Time I am unable to get any pattern match.
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.enableCheckpointing(3000) // checkpoint every 3000 msec
val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))
val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here
val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
val patternStream = CEP.pattern(event, pattern1)
val warnID = patternStream.sideOutputLateData(latedata).select(value => {
val v = value.mapValues(c => c.toList.toString)
Json(DefaultFormats).write(v).replace("\\\"", "\"")
//.replace("List(","{").replace(")","}")
})
val latedatastream = warnID.getSideOutput(latedata)
latedatastream.print("late_data")
warnID.print("warning")
event.print("event")
Timestamp extraction code
object ExtractAndAssignEventTime {
def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
if(!(timeFormat.equalsIgnoreCase("Unix"))){
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
override def extractTimestamp(t: GenericRecord): Long = {
new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
}
})
EventTimeStream
}
else{
val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
override def extractTimestamp(t: GenericRecord): Long = {
(t.get(timeColumn).toString.toLong)
}
})
EventTimeStream
}
}
Please help me solve this issue. Thanks in advance.!
Since You are using the AssingerWithPeriodicWatermark
You also need to set up the setAutowatermarkInterval
so that Flink will use this interval to generate watermarks.
You can do this by calling env.getConfig.setAutoWatermarkInterval([interval])
.
For Event Time CEP bases on Watermarks so If they are not generated then there will be basically no output.