I am running my Fat Jar in Flink Cluster which reads Kafka and saves in Cassandra, the code is,
final Properties prop = getProperties();
final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
(kafkaTopicName, new SimpleStringSchema(), prop);
flinkConsumer.setStartFromEarliest();
final DataStream<String> stream = env.addSource(flinkConsumer);
DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
@Override
public void flatMap(String value, Collector<Person> out) throws Exception {
try {
out.collect(objectMapper.readValue(value, Person.class));
} catch (JsonProcessingException e) {
logger.error("Json Processing Exception", e);
}
}
});
savePersonDetails(sensorStreaming);
env.execute();
and The Person POJO contains,
@Column(name = "event_time")
private Instant eventTime;
There is codec required to store Instant
as below for Cassandra side,
final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);
When i run standalone works fine, but when i run local cluster throws me an error as below,
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)
I read the below document for registering,
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html
but InstantCodec
is 3rd party one. How can i register it?
I solved the problem, there was LocalDateTime
which was emitting from and when i was converting with same type, there was above error. I changed the type into java.util Date
type then it worked.