Search code examples
apache-kafkaapache-flinkkafka-topic

How to read Kafka Topic line by line in Flink program


First, I load a CSV file in Kafka topic and I can print the Topic via Flink program in. The code is in following:

 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties prop = new Properties();
    prop.setProperty("bootstrap.servers", 
     "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    prop.setProperty("group.id", "test");
    FlinkKafkaConsumer<String> myConsumer= new FlinkKafkaConsumer<> 
     ("flinkTopic", new SimpleStringSchema(),prop);
    myConsumer.setStartFromEarliest();
    DataStream<String> stream = env.addSource(myConsumer);
    stream.print();
    env.execute("Flink Streaming Java API Skeleton");

My question is I want to read the Topic line by line and process each line separately, would you please guide me how I can read the Kafka Topic line by line?

Any help would be really appreciated.


Solution

  • For examples of what you might do, I recommend you work your way through the online Apache Flink Training. You can use operations like filter, map, flatmap, Windows, and ProcessFunctions to process the stream line by line.

    You may be wondering how to conveniently work with CSV data. The easiest approach is to use the Table/SQL API, which has a Kafka Connector of its own, and a CSV Format.

    Without using Flink's SQL engine, you could implement a map function that transforms each line of text into a POJO. There's an example of that here. Or implement your own de/serializer that you use instead of SimpleStringSchema.