I am writing a Flink Streaming application that has a KafkaSink<>
.
return KafkaSink.<EventWatchRecordMeta>builder()
.setBootstrapServers(applicationConfiguration.getSinkKafkaBrokers())
.setKafkaProducerConfig(properties)
.setRecordSerializer(new KafkaRecordSerializationSchema<EventWatchRecordMeta>() {
@Override
public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext)
throws Exception {
KafkaRecordSerializationSchema.super.open(context, sinkContext);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(EventWatchRecordMeta record,
KafkaSinkContext kafkaSinkContext, Long aLong) {
try {
StandardRecord message = record.getStandardRecord();
log.info("Producing record: {} to {} topic with key {}",
message.getData(), "output-topic", message.getKey());
return new ProducerRecord<>(record.getSinkTopic(), message.getKey(),
OBJECT_MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.writeValueAsString(message.getData()).getBytes());
} catch (JsonProcessingException e) {
log.error("Exception! {}", e.getMessage(), e);
e.printStackTrace();
return null;
}
}
}).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
}
When I try to submit this job, I get this following error:
Exception: org.apache.flink.api.common.InvalidProgramException: The implementation of the KafkaRecordSerializationSchema is not serializable. The implementation accesses fields of its enclosing class, which is a common reason for non-serializability. A common solution is to make the function a proper (non-inner) class, or a static inner class.
I'm not sure I understand what the issue is in my code.
It has already been said in the exception itself. The schema must be serializable, which is not possible if you defined it as an anonymous class and at the same time "The implementation accesses the fields of its enclosing class"
. Therefore, extract it into a separate top-level class or an inner static class (again, as stated in the exception itself).
The problem is that you can't reconstruct an object that uses objects that don't belong to it.