Flink Scala join between two Streams doesn't seem to work

I want to join two streams (json) coming from a Kafka producer. The code works if I filter the data. But it seems not working when I join them. I want to print to the console the joined stream but nothing appears. This is my code

import java.util.Properties 
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.json4s._
import org.json4s.native.JsonMethods
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object App {

def main(args : Array[String]) {

case class Data(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Data, stt: Stt)

case class Datas(location: String, timestamp: Long, measurement: Int, unit: String, accuracy: Double)
case class Sensor2(sensor_name: String, start_date: String, end_date: String, data_schema: Array[String], data: Datas, stt: Stt)

val properties = new Properties();
    properties.setProperty("bootstrap.servers", "");
    properties.setProperty("", "test");

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val consumer1 = new FlinkKafkaConsumer010[String]("topics1", new SimpleStringSchema(), properties)
   val stream1 = env

   val consumer2 = new FlinkKafkaConsumer010[String]("topics2", new SimpleStringSchema(), properties)
   val stream2 = env

   val s1 = { x => {
     implicit val formats = DefaultFormats
   val s2 = { x => {
     implicit val formats = DefaultFormats

  val s1t = s1.assignAscendingTimestamps { x => }
  val s2t = s2.assignAscendingTimestamps { x => }

  val j1pre = s1t.join(s2t)
              .apply((g, s) => (s.sensor_name, g.sensor_name,



I think the problem is on the assignment of the timestamp. I think that the assignAscendingTimestamp on the two sources is not the right function.

The json produced by the kafka producer has a field data.timestamp that should be assigned as the timestamp. But I don't know how to manage that.

I also thought that i should have to give a time window batch (as in spark) to the incoming tuples. But I'm not sure this is the right solution.


  • I think your code needs just some minor adjustments. First of all as you want to work in EventTime you should set appropriate TimeCharacteristic


    Also your code that you pasted is missing a sink for the stream. If you want to print to console you should:


    The rest of your code seems fine.