Search code examples
scalaapache-flinkcomplex-event-processingflink-cep

Flink CEP iterative condition for single pattern in Scala


The problem i am facing is that I am unable to perform a sum on a single CEP pattern in scala. I want to detect when sum is greater than 6100 for a particular customerID. I am providing a keyed stream to the CEP.pattern(...). I have provided my code below for pattern construction.

val pattern1 =Pattern.begin[GenericRecord]("start").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

My input is in avro format and Flink is consuming it from kafka. The input looks like this -:

{"trasanction_id":196,"customer_id":28,"datetime":"2017-09-01 12:35:08","amount":6094,"state":"FAILED"}
{"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
{"trasanction_id":199,"customer_id":27,"datetime":"2017-09-01 12:36:05","amount":2399,"state":"FAILED"}
{"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}```

However, below code runs well while using two patterns-:

val pattern1=Pattern.begin[GenericRecord]("start").followedBy("middle").where((v,ctx) => {

      lazy val sum= ctx.getEventsForPattern("start").map(_.get("amount").toString.toInt).sum

      print((sum + v.get("amount").toString.toLong).toString)
     //print(sum+v.get("amount").toString.toLong>6100)
      //println(v.get("customer_id").toString+v.get("amount").toString+" , ")
      (sum+v.get("amount").toString.toLong)>6100 && v.get("state").toString=="FAILED"
    }).oneOrMore

Solution

  • getEventsForPattern returns values already matched by the pattern. Let's analyze customer 27. When processing event

    {"trasanction_id":198,"customer_id":27,"datetime":"2017-09-01 12:36:04","amount":6024,"state":"FAILED"}
    

    your first snippet rejects this message as it is not satisfying condition: sum + amount = 0 + 6094 < 6100. When processing

    {"trasanction_id":197,"customer_id":28,"datetime":"2017-09-01 12:36:36","amount":547,"state":"FAILED"}
    

    your condition will once again be checking if 0 + 547 > 6100 and this is why you see no output.

    In your second example you are using followedBy operator which mean that you are going to process pairs of elements. First transaction is accepted unconditionally (as you are not including where operator) and now it is going to be returned by ctx.getEventsForPattern("start") call. I hope you know understand behaviour of this code.


    CEP is mostly used to spot patterns in data, not to aggregate them. Your problem can be approached by doing windowing followed by filtering - no need to use CEP here.