I implemented the
SerializationSchema
for KinesisStreamsSink
, and I initialized the SerializationSchema in the 'Open' method. But I found the 'Open' method was actually not called by the Flink. Any body knows this issue?
Because my implementation of the SerializationSchema depends on some class that is not Serializable, I have to initialize it in the open method on the runtime.
I used the KinesisStreamSink
followed the guide of the flink document which likes:
inesisStreamsSink<String> kdsSink =
KinesisStreamsSink.<String>builder()
.setKinesisClientProperties(sinkProperties) // Required
.setSerializationSchema(new CustomizedSchema()) // Required
.setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) // Required
.setStreamName("your-stream-name") // Required
.setFailOnError(false) // Optional
.setMaxBatchSize(500) // Optional
.setMaxInFlightRequests(50) // Optional
.setMaxBufferedRequests(10_000) // Optional
.setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional
.setMaxTimeInBufferMS(5000) // Optional
.setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional
.build();
In the code, the CustomizedSchema
was not properly initialized cause the open method was not called.
This is a bug in Flink 1.15. It is resolved in 1.16 with this PR.
The proposed workaround for Flink version 1.15 is a null-check and lazy initialization. See discussion on the Flink user mailing list.