Search code examples
javaapache-kafkaapache-flinkflink-streaming

Flink - serialize a pojo to Kafka sink


The structure of my Flink code is: get data in with kafka (topic_1_in) -> deserialize messages -> map -> manipulate the data -> get a POJO -> serialize message -> send data out with kafka (topic_1_out)

I'm now on the last stage where I would like to serialize my POJO. I have found the following example on the Flink website:

DataStream<String> stream = ...

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

    FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
            "my-topic",                  // target topic
            new SimpleStringSchema(),    // serialization schema
            properties,                  // producer config
            FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
    
    stream.addSink(myProducer);

But I don't understand how to implement a serialization schema.

I also read different possibilities:

https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html

But still, I'm a bit confused on how to convert my POJO to a string to feed the Kafka sink. The class is really simple so I assume that is quite straightforward.

public class POJO_block {
            public Double id;
            public Double tr_p;
            public Integer size;
            public Double last_info;
            public Long millis_last;
            private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;

}

Any example would be really appreciated.

thanks


Solution

  • The link mentioned in the question refers to internal Flink serialization, which is used when Flink needs to ship some of our data from one part of the cluster to another, though is not relevant when writing to Kafka.

    When Flink is interacting with an external storage, like Kafka, it relies on a connector, and how serialization happens when doing so depends on the configuration details of that connector as well as specific mechanisms of the underlying external storage (e.g. concepts like key and value in the case of kafka records).

    In the case you describe, because your program is using the DataStream API and is communicating with Kafka, the connector you're using is the Kafka Datastream API, and its documentation sits here.

    In the code you provided, this parameter of the FlinkKafkaProducer sink specifies how the serialization happens:

    // this is probably not what you want:
    new SimpleStringSchema(),    // serialization schema
    

    This configuration is not working because SimpleStringSchema is expecting strings as input, so a stream of POJO_block will make it fail.

    You can pass instead any implementation of org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema, containing one main function, letting you define the byte value of both the kafka key and value corresponding to each POJO_block block instance (i.e. T below):

    ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
    

    Note that if you were using the Table API to read and write Kafka instead of the DataStream API, this connector would be used instead, which has a convenient format configuration with ready-to-use formats like csv, json, avro, Debezium...