Search code examples
apache-flink

Flink interval join does not output


We have a Flink job that does intervalJoin two streams, both streams consume events from Kafka. Here is the example code

val articleEventStream: DataStream[ArticleEvent] = env.addSource(articleEventSource)
  .assignTimestampsAndWatermarks(new ArticleEventAssigner) 
val feedbackEventStream: DataStream[FeedbackEvent] = env.addSource(feedbackEventSource)
  .assignTimestampsAndWatermarks(new FeedbackEventAssigner) 

articleEventStream
    .keyBy(article => article.id)
    .intervalJoin(feedbackEventStream.keyBy(feedback => feedback.article.id))
    .between(Time.seconds(-5), Time.seconds(10))
    .process(new ProcessJoinFunction[ArticleEvent, FeedbackEvent, String] {
        override def processElement(left: ArticleEvent, right: FeedbackEvent, ctx: ProcessJoinFunction[ArticleEvent, FeedbackEvent, String]#Context, out: Collector[String]): Unit = {
         out.collect(left.name + " got feedback: " + right.feedback); 
        }
      });
});

class ArticleEventAssigner extends AssignerWithPunctuatedWatermarks[ArticleEvent] {
  val bound: Long = 5 * 1000

  override def checkAndGetNextWatermark(lastElement: ArticleEvent, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp - bound)
  }

  override def extractTimestamp(element: ArticleEvent, previousElementTimestamp: Long): Long = {
    element.occurredAt
  }
}

class FeedbackEventAssigner extends AssignerWithPunctuatedWatermarks[FeedbackEvent] {
  val bound: Long = 5 * 1000

  override def checkAndGetNextWatermark(lastElement: FeedbackEvent, extractedTimestamp: Long): Watermark = {
    new Watermark(extractedTimestamp - bound)
  }

  override def extractTimestamp(element: FeedbackEvent, previousElementTimestamp: Long): Long = {
    element.occurredAt
  }
}

However, we do not see any joined output. We checked that each stream does continuously emit elements with timestamp and proper watermark. Does anyone have any hint what could be possible reasons?


Solution

  • After checking different parts (timestamp/watermark, triggers), I just noticed that I made a mistake, i.e., the window size I used

    between(Time.seconds(-5), Time.seconds(10))

    is just too small, which could not find elements from both streams to join. This might sound obvious, but since I am new to Flink, I did not know where to check. So, my lesson is that if the join does not output, it could be necessary to check the window size. And thanks all for the comments!