Search code examples
javaapache-flinkflink-streaming

Java - Flink -> Fastest way to filter List of Tuples


I have stream that is coming from Kafka, and I'm manipulating with Flink. Part of the stream is composed by one Integer that I'm monitoring which indicate the height. I would like to save "height" and "counter" into a list ot Tuple2<Integer, Integer> where the first f0 is the height and the second element f1 is the number of times the same height has been previosly seen into the stream.

input: 170 -> ListofTuples[170,2]

input: 170 -> ListofTuples[170,2]

input: 120 -> ListofTuples[170,2] [120,1]

input: 140 -> ListofTuples[170,2], [120,1], [140,1]

input: 140 -> ListofTuples[170,2], [120,1], [140,2]

input: 140 -> ListofTuples[170,2], [120,1], [140,3]

input: 170 -> ListofTuples[170,3], [120,1], [140,3]

And so on.

What can be the fastest way to check the ListOfTuple to update the counter?

Thanks


Solution

  • You could keyBy(height field), and have a stateful MapFunction that updates a ValueState with the count of how many times the key has been seen.

    Though you didn't say what you wanted as a result of the stream - I'm guessing maybe emit the <height, count> tuple each time it gets updated, which would be easy to output from your MapFunction.

    See the word count example for an example similar to this, where it's counting word occurrences vs. integer value occurrences.