Search code examples
apache-kafkaapache-flinkflink-cepcomplex-event-processing

Apache Flink CEP pattern of multiple event types


Currently I'm working on a semester project where I have to recognize the series of three Events. Like P -> R -> P

We have two different event types which are consumed via a Kafka connector in the same topic.

I created a parent class called Event from which the other two types derive from.

The Kafka connector deserializes the JSON with the EventSchema, to the parent class Event.

val consumer = new FlinkKafkaConsumer("events", new EventSchema, properties)
val stream = env.addSource(consumer)

The pattern looks like this:

val pattern = Pattern
  .begin[Event]("before")
  .subtype(classOf[Position])
  .next("recognized")
  .subtype(classOf[Recognized])
  .next("after")
  .subtype(classOf[Position])

The current problem is, that if I send three messages with the appropriate format, the pattern will not be recognized.

What I tried else.. I changed the pattern like this:

val pattern = Pattern
  .begin[Event]("before")
  .where(e => e.getType == "position")
  .next("recognized")
  .where(e => e.getType == "recognition")
  .next("after")
  .where(e => e.getType == "position")

This pattern works, but later I can't cast the Event class to position or recognition..

What do I miss here?


Solution

  • According to the comments, I think you should return the subtype instances instead of the Event. Here is my example codes for you:

    val event = mapper.readValue(bytes, classOf[Event])
    event.getType match {
      case "position" => mapper.readValue(bytes, classOf[Position])
      case "recognition" => mapper.readValue(bytes, classOf[Recognized])
      case _ =>
    }
    

    I successfully tried the example from a test case in CEPITCase.java.

    DataStream<Event> input = env.fromElements(
      new Event(1, "foo", 4.0),
      new SubEvent(2, "foo", 4.0, 1.0),
      new SubEvent(3, "foo", 4.0, 1.0),
      new SubEvent(4, "foo", 4.0, 1.0),
      new Event(5, "middle", 5.0)
    );
    
    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").subtype(SubEvent.class)
    .followedByAny("middle").subtype(SubEvent.class)
    .followedByAny("end").subtype(SubEvent.class);