Search code examples
apache-flinkflink-streaming

Flink use of .keyBy with KeySelector taking data from Kafka


I have Flink script in java with Kafka connector. I'm getting the data from Kafka with no issues, first step I'm doing a .map to get the time stamp from the messages. To use event time windows i extracted the time stamp in milliseconds from the data and return it to flink. To do this i used "assignTimestampsAndWatermarks"

 DataStream<String> kafkaData = env.addSource(new FlinkKafkaConsumer("CorID_0", new SimpleStringSchema(), p));

    kafkaData.map(new MapFunction<
            String, Tuple19<String, String, String, String, String,
            String, Double, Long, Double, Long,
            Long, Integer, Long, Double, Long,
            Double, Double, Integer, Double>>()
    {
        public Tuple19<String, String, String, String, String,
                String, Double, Long, Double, Long,
                Long, Integer, Long, Double, Long,
                Double, Double, Integer, Double> map(String value)
        {
            String[] words = value.split(",");
            return new Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>
                    (words[0], words[1], words[2], words[3], words[4], words[5], Double.parseDouble(words[6]),
                            Long.parseLong(words[7]), Double.parseDouble(words[8]), Long.parseLong(words[9]),
                            Long.parseLong(words[10]), Integer.parseInt(words[11]),
                            Long.parseLong(words[12]), Double.parseDouble(words[13]),
                            Long.parseLong(words[14]), Double.parseDouble(words[15]),
                            Double.parseDouble(words[16]), Integer.parseInt(words[17]),
                            Double.parseDouble(words[18]));
        }
    })

            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple19<String, String, String, String, String,
                    String, Double, Long, Double, Long,
                    Long, Integer, Long, Double, Long,
                    Double, Double, Integer, Double>>()
            {
                private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                
                public long extractAscendingTimestamp(Tuple19<String, String, String, String, String,
                        String, Double, Long, Double, Long,
                        Long, Integer, Long, Double, Long,
                        Double, Double, Integer, Double> value)
                {
                    try
                    {
                        Timestamp ts = new Timestamp(sdf.parse(value.f3).getTime());
                        return ts.getTime();
                    } catch (Exception e)
                    {
                        throw new RuntimeException("Parsing Error");
                    }
                }
            });

Second step is where the calculation start. I'm trying to do some manipulation of the data and to do this I need to get the data from the kafka message, this is where i got stuck basically.

DataStream<String> largeDelta = kafkaData .keyBy(new KeySelector<Tuple19<String,String,String,String,String,
                String,Double,Long,Double,Long,
                Long,Integer,Long,Double,Long,
                Double,Double, Integer,Double>, String>()
                {
            public String getKey(Tuple19<String,String,String,String,String,
                    String,Double,Long,Double,Long,
                    Long,Integer,Long,Double,Long,
                    Double,Double, Integer,Double> value)
                {
                return value.f2;
                }
                })

                 .window(TumblingEventTimeWindows.of(Time.minutes(5)))
                 .process(new TrackChanges(5));
        largeDelta.writeAsText("/Alert.txt");
        env.execute("ABCD");

The problem is that i have an error message that is telling me "cannot resolve method 'KeyBy(anonymous org.apache.flink.api.java.functions....'

enter image description here

Any help would be really welcome as Im struggling to understand what I'm missing.

Thanks


Solution

  • I'm guessing your new MapFunction()... is converting the incoming String to a Tuple2<String, String>, as otherwise having a KeySelector<Tuple2<String, String>, String> would make no sense.

    If so, then you need to assign the result of kafkaData.map(new MapFunction<... to a DataStream<Tuple2<String, String>> blah, and then have use that with your keyBy.

    Though having said that, I don't see how you have a keyBy().window() of a Tuple2<String, String> resulting in a DataStream<String> largeDelta. So feels like multiple issues.

    As an aside, for simple key selectors, instead of defining an anonymous function, use a lambda expression. E.g. kafkaData.keyBy(r -> r.f1) would do it.