Search code examples
scalaintellij-ideaapache-flinkcomplex-event-processing

Flink CEP unknown error alerted by IntelliJ IDE


I started studying Apache Flink's CEP libraries in Scala language, and as I was trying to create a PatternStream by executing CEP.pattern(input,pattern) as shown in the tutorial at https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/cep.html, the IDE says that it "Cannot resolve overloaded method", referring to the pattern method. According to the implementation of readTextFile and Pattern[String].begin('line').where(_.length == 10), both of which I used to create the input and pattern respectively, there shouldn't be any problems with the method's arguments or generic types.

Here goes the code I wrote. I know it isn't complete, but I couldn't complete it anyways since this problem came up.

package FlinkCEPClasses

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.cep.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.core.fs.FileSystem
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

class FlinkCEPPipeline {

  var props : Properties = new Properties()

  var env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  env.setParallelism(1)

  var input : DataStream[String] = env.readTextFile("/home/luca/Desktop/lines")

  var patt : Pattern[String,String] = Pattern.begin[String]("igual").where(_.length == 10)

  // Problem appears at the following line. A red subscript appears at the pattern method, 
  // saying the following: "Cannot resolve overloaded method"

  var CEPstream = CEP.pattern(input,patt)

  input.writeAsText("/home/luca/Desktop/flinkcepout",FileSystem.WriteMode.OVERWRITE)

  env.execute()


Here goes my build.sbt file content:


name := "FlinkCEP"

version := "0.1"

scalaVersion := "2.12.10"

// https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-scala
libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"

// https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"

libraryDependencies += "log4j" % "log4j" % "1.2.17"

libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime

libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test```

My purpose with this code is just to see it running a simple "where" condition, it shouldn't have any greater utility besides that. I'm using IntelliJ as IDE. Also, I'm not sure if Scala's libraries for CEP are ready to be used. I would appreciate if anyone could shed any light over this.


Solution

  • Well, I finally solved the problem after looking at @DavidAnderson 's github examples. Since I made many changes, I can't say for sure if my solution will work for you, but i changed from import org.apache.flink.streaming.api.datastream.DataStream to import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}. Watch out for ambiguous imports and make sure you're importing the real required classes.

    I'm gonna list all my imports and my build.sbt file so that you have full access to my configuration.

    Imports

    import java.util.Properties
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.cep.scala.pattern.Pattern
    import org.apache.flink.core.fs.FileSystem
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.cep.scala.PatternStream
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.flink.cep.scala.{CEP, PatternStream}
    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, DataStream, _}
    

    Build.sbt

    name := "FlinkCEP"
    
    version := "0.1"
    
    scalaVersion := "2.12.10"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
    //libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-cep
    libraryDependencies += "org.apache.flink" %% "flink-cep" % "1.9.0"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-cep-scala
    libraryDependencies += "org.apache.flink" %% "flink-cep-scala" % "1.9.0"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-runtime
    libraryDependencies += "org.apache.flink" %% "flink-runtime" % "1.9.0" % Test
    
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-scala
    libraryDependencies += "org.apache.flink" %% "flink-scala" % "1.9.0"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala
    libraryDependencies += "org.apache.flink" %% "flink-streaming-scala" % "1.9.0"
    
    // https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka
    libraryDependencies += "org.apache.flink" %% "flink-connector-kafka" % "1.9.0"
    
    libraryDependencies += "log4j" % "log4j" % "1.2.17"
    
    libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.1.3" % Runtime
    
    libraryDependencies += "org.slf4j" % "slf4j-simple" % "1.6.2" % Test