Search code examples
apache-flinkflink-streaming

How to read json file format in Apache flink using java


How to read json file format in Apache flink using java. I am not able to find any proper code to read json file in flink using java and do some transformation on top of it. Any suggestions or code is highly appreciated.


Solution

  • For using Kafka with the DataStream API, see https://stackoverflow.com/a/62072265/2000823. The idea is to implement an appropriate DeserializationSchema, or KafkaDeserializationSchema. There's an example (and pointers to more) in the answer I've linked to above.

    Or if you want to use the Table API or SQL, it's easier. You can configure this with a bit of DDL. For example:

    CREATE TABLE minute_stats (
      `minute` TIMESTAMP(3),
      `currency` STRING,
      `revenueSum` DOUBLE,
      `orderCnt` BIGINT,
      WATERMARK FOR `minute` AS `minute` - INTERVAL '10' SECOND
    ) WITH (
      'connector.type' = 'kafka',       
      'connector.version' = 'universal',
      'connector.topic' = 'minute_stats',
      'connector.properties.zookeeper.connect' = 'not-needed',
      'connector.properties.bootstrap.servers' = 'kafka:9092',
      'connector.startup-mode' = 'earliest-offset',
      'format.type' = 'json'
    );
    

    For trying things out locally while reading from a file, you'll need to do things differently. Something like this

    DataStreamSource<String> rawInput = env.readFile(
        new TextInputFormat(new Path(fileLocation)), fileLocation);
    
    DataStream<Event> = rawInput.flatMap(new MyJSONTransformer());
    

    where MyJSONTransformer might use a jackson ObjectMapper to convert JSON into some convenient Event type (a POJO).