Search code examples
javaapache-kafkaapache-flinkflink-streamingflink-sql

Consuming a json from kafka on flink


package org.example;

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.connector.kafka.source.KafkaSource;
    import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
    import org.apache.flink.formats.json.JsonDeserializationSchema;
    
    
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    
    import static org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy.build;
    
    
    public class DataStreamJob {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            KafkaSource<String> source = KafkaSource.<String>builder()
                    .setBootstrapServers("localhost:19092")
                    .setTopics("1000pepeusdt","1000pepeusdtt")
                    .setStartingOffsets(OffsetsInitializer.earliest())
    **\\the error is on this line**.setValueOnlyDeserializer(new JsonDeserializationSchema<>(TradeEvent.class))
                    .build();
    
            env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
                    .print();
    
            // Execute the job
            env.execute("Kafka Consumer Example");
        }
    }

This is what I'm trying to

consume: {
"e":"aggTrade"
"E":1684547379069
"a":30161410
"s":"1000PEPEUSDT"
"p":"0.0015036"
"q":"665070"
"f":70012793
"l":70012793
"T":1684547378913
"m":true
}

Solution

  • You're on the right track. One way you can resolve this is being calling:

    .setValueOnlyDeserializer(new TradeEventDeserializationSchema())

    Your TradeEventDeserializationSchema would be a custom deserializer, which would use something like Jackson's ObjectMapper to deserialize each incoming event, e.g.:

    public class TradeEventDeserializationSchema extends AbstractDeserializationSchema<Event> {
    
        private static final long serialVersionUID = 1L;
    
        private transient ObjectMapper objectMapper;
    
        /**
         * For performance reasons it's better to create on ObjectMapper in this open method rather than
         * creating a new ObjectMapper for every record.
         */
        @Override
        public void open(InitializationContext context) {
            // JavaTimeModule is needed for Java 8 data time (Instant) support
            objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
        }
    
        /**
         * If our deserialize method needed access to the information in the Kafka headers of a
         * KafkaConsumerRecord, we would have implemented a KafkaRecordDeserializationSchema instead of
         * extending AbstractDeserializationSchema.
         */
        @Override
        public Event deserialize(byte[] message) throws IOException {
            return objectMapper.readValue(message, Event.class);
        }
    }
    

    Event is this case would be your event POJO.

    You can find more details and an example at https://docs.immerok.cloud/docs/how-to-guides/development/deserializing-json-from-kafka-to-apache-flink-pojo/