Search code examples
javaapache-kafkaapache-flinkmap-functiondata-stream

Flink map stream csv file into Tuple


I am trying to map a CSV file, already consumed by Flink and produced by Kafka, into a Tuple4. My CSV file has 4 columns and I want to map each row into a Tuple4. The problem is that I do not know how to implement the map() and the csv2Tuple functions.

Here is where I am stuck:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);

DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
            new SimpleStringSchema(), parameterTool.getProperties()));

DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}

I would like also parse from String to Integer the items in the tuple.


Solution

  • Suppose you produce every row of csv file as Kafka message and consume it using Flink Kafka connector, You just should split every consumed message with the , (because it is a csv file).

    DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
                @Override
                public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
                    String[] temp = str.split(",");
                    return new Tuple4<>(
                            Integer.parseInt(temp[0]),
                            Integer.parseInt(temp[1]),
                            Integer.parseInt(temp[2]),
                            Integer.parseInt(temp[3])
                    );
    
                }
            });