Search code examples
scalaapache-kafkaapache-flinkflink-streaming

Scala String Sink for FlinkKafkaProducer extending KafkaSerializationSchema


[Flink version -- 1.9]

So I am trying to sink a string (in JSON format) to a kafka topic and am a bit stuck on how to implement a KafkaSerializationSchema to sink a string. It doesnt seem that a SimpleStringSchema will work with the FlinkKafkaProducer because it expects a KafkaSerializationSchema.

If there is some code out there already that is a util like SimpleStringSchema for kafka I would like that better but if I have to write my own can anyone explain why my converted scala code from another stackoverflow post in java that was doing basically the same thing doesn't override anything?

  def defineKafkaDataSink(topic: String,
                          kafkaBootstrapServer: String = "localhost:9092"):FlinkKafkaProducer[String] = {
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBootstrapServer)
    new FlinkKafkaProducer[String](topic,new ProducerStringSerializationSchema(topic),properties,FlinkKafkaProducer.Semantic.AT_LEAST_ONCE)
  }


  // Not sure why serialize doesnt override anything
  // working from a java stack overflow post
  // https://stackoverflow.com/questions/58644549/how-to-implement-flinkkafkaproducer-serializer-for-kafka-2-2
  import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
  import org.apache.kafka.clients.producer.ProducerRecord
  import java.nio.charset.StandardCharsets

  class ProducerStringSerializationSchema(var topic: String) extends KafkaSerializationSchema[String] {
    override def serialize(element: String, timestamp: Long) = new ProducerRecord[Array[Byte], Array[Byte]](topic, element.getBytes(StandardCharsets.UTF_8))
  }

Solution

  • This is in fact more a scala than a flink question. Make sure that the types exactly align to the respective java types. scala has the habit of providing very similar looking types that are just not the same type as the java types.

    override def serialize(element: String, timestamp: java.lang.Long): ProducerRecord[Array[Byte], Array[Byte]]
    

    in particular double-check that Long is indeed the java Long and not the scala Long.

    If you encounter issues like that in the future, it's often easier to let your IDE generate the method stubs, because it will "know" the right signature. So just comment out your method and let your IDE complain that you actually have unimplemented interfaces. Then your IDE hopefully proposes to add them for you.