Search code examples
scalajoinstreamapache-flink

Scala Flink Join between two streams doesn't work


I would like to join two streams coming from a kafka producer, but the join does not work. I am use AssignerWithPeriodicWatermark to define my assigner and I try to join the two streams using 3 min windows. But I don't get any output. I printed the two streams to make sure they have events that are close enough together in time.

object Job {

class Assigner extends AssignerWithPeriodicWatermarks[String] {
  // 1 s in ms
  val bound: Long = 1000
  // the maximum observed timestamp
  var maxTs: Long = Long.MinValue

  override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
   }

  override def extractTimestamp(r: String, previousTS: Long): Long = {    
    maxTs = Math.max(maxTs,previousTS)    
    previousTS
   }
}

 def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment//createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9093")

properties.setProperty("group.id", "test")
val consumerId = new FlinkKafkaConsumer[String]("topic_id", new SimpleStringSchema(), properties)

val streamId = env.addSource(consumerId).assignTimestampsAndWatermarks(new Assigner)

val streamIdParsed=streamId.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("m","w")) }


val consumerV = new FlinkKafkaConsumer[String]("topic_invoice", new SimpleStringSchema(), properties)


val streamV = env.addSource(consumerV).assignTimestampsAndWatermarks(new Assigner)

val streamVParsed = streamV.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("products")(0).toString().parseJson.asJsObject.getFields("id2", "id3")) }


    streamIdParsed.join(streamVParsed).where(_._1).equalTo(_._1).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(1))).apply { (e1, e2) => (e1._1,"test") }.print()
} }

Solution

  • The issue is that You haven't set the autoWatermarkInterval and you are using the PeriodicAssigner. You need to do the following:

    env.getConfig.setAutowatermarkInterval([someinterval])

    This should fix the issue with Watermarks not being generated.