I am trying to map a CSV file, already consumed by Flink and produced by Kafka, into a Tuple4. My CSV file has 4 columns and I want to map each row into a Tuple4. The problem is that I do not know how to implement the map() and the csv2Tuple functions.
Here is where I am stuck:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(ARGS);
DataStreamSource<String> myConsumer = env.addSource(new FlinkKafkaConsumer082<>(parameterTool.getRequired("topic"),
new SimpleStringSchema(), parameterTool.getProperties()));
DataStream<Tuple4<Integer, Integer, Integer, Integer>> streamTuple = myConsumer.map(new csv2Tuple());
public static class csv2Tuple implements MapFunction<...> {public void map(){...}}
I would like also parse from String to Integer the items in the tuple.
Suppose you produce every row of csv
file as Kafka message and consume it using Flink Kafka connector, You just should split every consumed message with the ,
(because it is a csv
file).
DataStream<Tuple4<Integer, Integer, Integer, Integer,>> streamTuple = myConsumer.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
@Override
public Tuple4<Integer, Integer, Integer, Integer> map(String str) throws Exception {
String[] temp = str.split(",");
return new Tuple4<>(
Integer.parseInt(temp[0]),
Integer.parseInt(temp[1]),
Integer.parseInt(temp[2]),
Integer.parseInt(temp[3])
);
}
});