The structure of my Flink code is: get data in with kafka (topic_1_in) -> deserialize messages -> map -> manipulate the data -> get a POJO -> serialize message -> send data out with kafka (topic_1_out)
I'm now on the last stage where I would like to serialize my POJO. I have found the following example on the Flink website:
DataStream<String> stream = ...
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<>(
"my-topic", // target topic
new SimpleStringSchema(), // serialization schema
properties, // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
stream.addSink(myProducer);
But I don't understand how to implement a serialization schema.
I also read different possibilities:
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html
But still, I'm a bit confused on how to convert my POJO to a string to feed the Kafka sink. The class is really simple so I assume that is quite straightforward.
public class POJO_block {
public Double id;
public Double tr_p;
public Integer size;
public Double last_info;
public Long millis_last;
private ArrayList<Tuple3<Integer, Integer, Integer>> list_val;
}
Any example would be really appreciated.
thanks
The link mentioned in the question refers to internal Flink serialization, which is used when Flink needs to ship some of our data from one part of the cluster to another, though is not relevant when writing to Kafka.
When Flink is interacting with an external storage, like Kafka, it relies on a connector, and how serialization happens when doing so depends on the configuration details of that connector as well as specific mechanisms of the underlying external storage (e.g. concepts like key and value in the case of kafka records).
In the case you describe, because your program is using the DataStream API and is communicating with Kafka, the connector you're using is the Kafka Datastream API, and its documentation sits here.
In the code you provided, this parameter of the FlinkKafkaProducer
sink specifies how the serialization happens:
// this is probably not what you want:
new SimpleStringSchema(), // serialization schema
This configuration is not working because SimpleStringSchema
is expecting strings as input, so a stream of POJO_block
will make it fail.
You can pass instead any implementation of org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema
, containing one main function, letting you define the byte value of both the kafka key and value corresponding to each POJO_block
block instance (i.e. T
below):
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp);
Note that if you were using the Table API to read and write Kafka instead of the DataStream API, this connector would be used instead, which has a convenient format configuration with ready-to-use formats like csv, json, avro, Debezium...