Search code examples

Register Java Class in Flink Cluster

I am running my Fat Jar in Flink Cluster which reads Kafka and saves in Cassandra, the code is,

        final Properties prop = getProperties();
        final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
                                             (kafkaTopicName, new SimpleStringSchema(), prop);

        final DataStream<String> stream = env.addSource(flinkConsumer);
        DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    logger.error("Json Processing Exception", e);

and The Person POJO contains,

    @Column(name = "event_time")
    private Instant eventTime;

There is codec required to store Instant as below for Cassandra side,

final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);

When i run standalone works fine, but when i run local cluster throws me an error as below,

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
    at com.datastax.driver.core.CodecRegistry.notFound(
    at com.datastax.driver.core.CodecRegistry.createCodec(
    at com.datastax.driver.core.CodecRegistry.findCodec(
    at com.datastax.driver.core.CodecRegistry.access$200(
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(

I read the below document for registering,

but InstantCodec is 3rd party one. How can i register it?


  • I solved the problem, there was LocalDateTime which was emitting from and when i was converting with same type, there was above error. I changed the type into java.util Date type then it worked.