Search code examples
apache-flink

flink DataStream keyBy API


I am new to Flink and following is the streaming mode word count:

//x is the stream of (word, 1)
val x: DataStream[(String, Int)] = text
  .flatMap(_.toLowerCase.split("\\W+")) 
  .map((_, 1))

//keyBy on the word field, what does the Tuple here mean in y   
val y: KeyedStream[(String, Int), Tuple] = x.keyBy(0)  

val z: DataStream[(String, Int)] = y.sum(1)

z.print

Suppose x is a stream of ("a", 1), ("b", 1), ("c",1),("a",1),("c",1),("c",1) What will y look like(I don't understand what Tuple here means), then what does z look like?


Solution

  • When you specify keyBy(0), you are keying the stream by the first element of the Tuples that are in the stream, or in other words, you are keying the stream by the word string. However, the compiler isn't able to figure out that the key are Strings, so this version of keyBy always treats the key as a Tuple containing some object (which is the actual key).

    If you rewrite the keyBy as keyBy(_._1) then the compiler will be able to infer the key type, and y will be a KeyedStream[(String, Int), String], which should feel better.

    What keying the stream accomplishes is to partition the stream, similar to the way that groupBy in SQL splits a table into disjoint, non-overlapping groups. So in this case the stream ("a",1), ("b",1), ("c",1), ("a",1), ("c",1), ("c",1) is logically split apart into three groups:

    ("a",1), ("a",1)
    ("b",1)
    ("c",1), ("c",1), ("c",1)
    

    Then computing sum(1) on each of these has the result of reducing (in the map/reduce sense) each of these by adding up the second fields in all of the tuples in each group. So, ("a",1), ("a",1) becomes ("a",2), and so on.

    Rather than using z=y.sum(1), it might be easier to understand this written out more fully as

    val z: DataStream[(String, Int)] = y.reduce(new ReduceFunction[(String, Int)] {
      override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) =
        (t1._1, t1._2 + t2._2)
    })
    

    You can see precisely what z looks like if you run the code. If you give it sufficient resources, it can run in three separate threads (as there are three distinct keys). I got these results just now:

    3> (a,1)
    2> (c,1)
    1> (b,1)
    2> (c,2)
    2> (c,3)
    3> (a,2)
    

    where the 1>, 2>, and 3> are indicating which thread was responsible for that line of output.