Search code examples
scalaapache-kafkaapache-flinkflink-cep

Using Apache Flink to consume from a Kafka topic then processing the stream with Flink CEP


In this project, I'm trying to consume data from a Kafka topic using Flink and then process the stream to detect a pattern using Flink CEP. The part of using Kafka connect works and data is being fetched, but the CEP part doesn't work for some reason. I'm using scala in this project.

build.sbt:


version := "0.1"

scalaVersion := "2.11.12"

libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.12.2"

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.3.0"

libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.12.2"


libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.12.2" 

the main code:

import org.apache.flink.api.common.serialization.SimpleStringSchema

import java.util
import java.util.Properties
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.scala._
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.cep.pattern.conditions.IterativeCondition

object flinkExample {
  def main(args: Array[String]): Unit = {


    val CLOSE_THRESHOLD: Double = 140.00

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    properties.setProperty("group.id", "test")

    val consumer = new FlinkKafkaConsumer[String]("test", new SimpleStringSchema(), properties)
    consumer.setStartFromEarliest




    val see: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    val src: DataStream[String] = see.addSource(consumer)

    
    val keyedStream: DataStream[Stock] = src.map(v => v)
      .map {
        v =>
          val data = v.split(":")

          val date = data(0)
          val close = data(1).toDouble
          Stock(date,close)
      }

    val pat = Pattern
      .begin[Stock]("start")
      .where(_.Adj_Close > CLOSE_THRESHOLD)


    val patternStream = CEP.pattern(keyedStream, pat)

    val result = patternStream.select(
      patternSelectFunction = new PatternSelectFunction[Stock, String]() {
        override def select(pattern: util.Map[String, util.List[Stock]]): String = {
          val data = pattern.get("first").get(0)

          data.toString
        }
      }
    )

    result.print()

    see.execute("ASK Flink Kafka")

  }

  case class Stock(date: String,
                   Adj_Close: Double)
  {
    override def toString: String = s"Stock date: $date, Adj Close: $Adj_Close"
  }

}

Data coming from Kafka are in string format: "date:value"

Scala version: 2.11.12 Flink version: 1.12.2 Kafka version: 2.3.0

I'm building the project using: sbt assembly, and then deploy the jar in the flink dashboard.


Solution

  • With pattern.get("first") you are selecting a pattern named "first" from the pattern sequence, but the pattern sequence only has one pattern, which is named "start". Trying changing "first" to "start".

    Also, CEP has to be able to sort the stream into temporal order in order to do pattern matching. You should define a watermark strategy. For processing time semantics you can use WatermarkStrategy.noWatermarks().