Search code examples
cassandraapache-flinkflink-streamingdatastax-java-driver

Register Java Class in Flink Cluster


I am running my Fat Jar in Flink Cluster which reads Kafka and saves in Cassandra, the code is,

        final Properties prop = getProperties();
        final FlinkKafkaConsumer<String> flinkConsumer = new FlinkKafkaConsumer<>
                                             (kafkaTopicName, new SimpleStringSchema(), prop);
        flinkConsumer.setStartFromEarliest();

        final DataStream<String> stream = env.addSource(flinkConsumer);
        DataStream<Person> sensorStreaming = stream.flatMap(new FlatMapFunction<String, Person>() {
            @Override
            public void flatMap(String value, Collector<Person> out) throws Exception {
                try {
                    out.collect(objectMapper.readValue(value, Person.class));
                } catch (JsonProcessingException e) {
                    logger.error("Json Processing Exception", e);
                }
            }
        });
        savePersonDetails(sensorStreaming);
        env.execute();

and The Person POJO contains,

    @Column(name = "event_time")
    private Instant eventTime;

There is codec required to store Instant as below for Cassandra side,

final Cluster cluster = ClusterManager.getCluster(cassandraIpAddress);
cluster.getConfiguration().getCodecRegistry().register(InstantCodec.instance);

When i run standalone works fine, but when i run local cluster throws me an error as below,

Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException: Codec not found for requested operation: [timestamp <-> java.time.Instant]
    at com.datastax.driver.core.CodecRegistry.notFound(CodecRegistry.java:679)
    at com.datastax.driver.core.CodecRegistry.createCodec(CodecRegistry.java:526)
    at com.datastax.driver.core.CodecRegistry.findCodec(CodecRegistry.java:506)
    at com.datastax.driver.core.CodecRegistry.access$200(CodecRegistry.java:140)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:211)
    at com.datastax.driver.core.CodecRegistry$TypeCodecCacheLoader.load(CodecRegistry.java:208)

I read the below document for registering,

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/custom_serializers.html

but InstantCodec is 3rd party one. How can i register it?


Solution

  • I solved the problem, there was LocalDateTime which was emitting from and when i was converting with same type, there was above error. I changed the type into java.util Date type then it worked.