Search code examples
apache-kafkaapache-flink

How to push datastream to kafka topic by retaining the order using string method in Flink Kafka Problem


I am trying to create a JSON dataset every 500 ms and want to push it to the Kafka topic so that I can set up some windows in the downstream and perform computations. Below is my code:

package KafkaAsSource

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.Semantic
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer}
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper


import java.time.format.DateTimeFormatter
import java.time.LocalDateTime
import java.util.{Optional, Properties}

object PushingDataToKafka {

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setMaxParallelism(256)
    env.enableCheckpointing(5000)
    val stream: DataStream[String] = env.fromElements(createData())

    stream.addSink(sendToTopic(stream))
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    return properties
  }

  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 1000
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(jsonData)
      Thread.sleep(500)
    }
    return jsonData
  }

  def sendToTopic(): Properties = {
    val producer = new FlinkKafkaProducer[String](
      "topic"
      ,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema())
      ,
      getProperties(),
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE
    )
    return producer
  }
}

It gives me below error:

type mismatch;
 found   : Any
 required: org.apache.flink.streaming.api.functions.sink.SinkFunction[String]
    stream.addSink(sendToTopic())

Modified Code:

object FlinkTest {

  def main(ars: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    env.setMaxParallelism(256)
    var stream = env.fromElements("")
    //env.enableCheckpointing(5000)
    //val stream: DataStream[String] = env.fromElements("hey mc", "1")

    val myProducer = new FlinkKafkaProducer[String](
      "maddy", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE)
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      println(a)
      Thread.sleep(500)
      stream = env.fromElements(jsonData)
      println(jsonData)
      stream.addSink(myProducer)
    }

    env.execute("hey")
  }

  def getProperties(): Properties = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")
    return properties
  }
  /*
  def createData(): String = {
    val minRange: Int = 0
    val maxRange: Int = 10
    var jsonData = ""
    for (a <- minRange to maxRange) {
      jsonData = "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\"" + DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now) + "\"\n  \n}"
      Thread.sleep(500)
    }
    return jsonData
  }
  */

}

Modified Code gives me the data in the Kafka topic but it doesn't retain the order. What am I doing wrong here in the loops? Also, had to change the version of Flink to 1.12.2 from 1.13.5.

I was initially using Flink 1.13.5, Connectors and Scala of 2.11. What exactly I am missing over here?


Solution

  • A couple of things about this loop:

    for (a <- minRange to maxRange) {
        jsonData = 
          "{\n  \"id\":\"" + a + "\",\n  \"Category\":\"Flink\",\n  \"eventTime\":\""
          + DateTimeFormatter
            .ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
            .format(LocalDateTime.now) + "\"\n  \n}"
        println(a)
        Thread.sleep(500)
        stream = env.fromElements(jsonData)
        println(jsonData)
        stream.addSink(myProducer)
    }
    
    • The sleep is happening in the Flink client, and only affects how long it takes the client to assemble the job graph before submitting it to the cluster. It has no effect on how the job runs.

    • This loop is creating 10 separate pipelines that will run independently, in parallel, all producing to the same Kafka topic. Those pipelines are going to race against each other.


    To get the behavior you're looking for (a global ordering across a single pipeline) you'll want to produce all of the events from a single source (in order, of course), and run the job with a parallelism of one. Something like this would do it:

    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    
    object FlinkTest {
    
      def main(ars: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.setParallelism(1)
    
        val myProducer = ...
        val jsonData = (i: Long) => ...
    
        env.fromSequence(0, 9)
          .map(i => jsonData(i))
          .addSink(myProducer)
    
          env.execute()
      }
    }
    

    You can leave maxParallelism at 256 (or at its default value of 128); it's not particularly relevant here. The maxParallelism is the number of hash buckets that keyBy will hash the keys into, and it defines an upper limit on the scalability of the job.