Search code examples
apache-flinkflink-streaming

SerializationSchema open method was not called


I implemented the SerializationSchema for KinesisStreamsSink, and I initialized the SerializationSchema in the 'Open' method. But I found the 'Open' method was actually not called by the Flink. Any body knows this issue?

Because my implementation of the SerializationSchema depends on some class that is not Serializable, I have to initialize it in the open method on the runtime.

I used the KinesisStreamSink followed the guide of the flink document which likes:

inesisStreamsSink<String> kdsSink =
    KinesisStreamsSink.<String>builder()
        .setKinesisClientProperties(sinkProperties)                               // Required
        .setSerializationSchema(new CustomizedSchema())                         // Required
        .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
        .setStreamName("your-stream-name")                                        // Required
        .setFailOnError(false)                                                    // Optional
        .setMaxBatchSize(500)                                                     // Optional
        .setMaxInFlightRequests(50)                                               // Optional
        .setMaxBufferedRequests(10_000)                                           // Optional
        .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
        .setMaxTimeInBufferMS(5000)                                               // Optional
        .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
        .build();

In the code, the CustomizedSchema was not properly initialized cause the open method was not called.


Solution

  • This is a bug in Flink 1.15. It is resolved in 1.16 with this PR.

    The proposed workaround for Flink version 1.15 is a null-check and lazy initialization. See discussion on the Flink user mailing list.