Search code examples
scalaapache-kafkastreamingapache-flink

How to implement Window Function in Apache Flink?


everyone, I have a kafka topic source, I group it by a 1 minute window. What I want to do in that window is to create new columns with Window Function as in SQL, for example I want to use

  • SUM(amount) OVER(PARTITION BY
  • COUNT(user) OVER(PARTITION BY
  • ROW_NUMBER() OVER(PARTITION BY

Can I use DataStream functions for these operations? or

How can I operate my kafka data to convert it to DataTable and use sqlQuery?

Destination is another kafka topic.

    val stream = senv
      .addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

I've tried to do this

val tableA = tableEnv.fromDataStream(stream, 'user, 'product, 'amount)

but I get the following error back

Exception in thread "main" org.apache.flink.table.api.ValidationException: Too many fields referenced from an atomic type.

test data

1,"beer",3
1,"beer",1
2,"beer",3
3,"diaper",4
4,"diaper",1
5,"diaper",5
6,"rubber",2

Query example

    SELECT
     user, product, amount,
     COUNT(user) OVER(PARTITION BY product) AS count_product
   FROM table;

expected performance

1,"beer",3,3
1,"beer",1,3
2,"beer",3,3
3,"diaper",4,3
4,"diaper",1,3
5,"diaper",5,3
6,"rubber",2,1

Solution

  • You need to parse the string into fields and then rename them afterwards.

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)
    
    val stream = env.fromElements("1,beer,3",
     "1,beer,1","2,beer,3","3,diaper,4","4,diaper,1","5,diaper,5","6,rubber,2");
    
    val parsed = stream.map(x=> {
     val arr = x.split(",")
     (arr(0).toInt, arr(1), arr(2).toInt)
    })
    
    val tableA = tEnv.fromDataStream(parsed, $"_1" as "user", $"_2" as "product", $"_3" as "amount")
    
    // example query
    val result = tEnv.sqlQuery(s"SELECT user, product, amount from $tableA")
    
    val rs = result.toAppendStream[(Int, String, Int)]
    
    rs.print()
    

    I'm not sure how can we implement the desired window function in Flink SQL. Alternatively, it can be implemented in simple Flink as follows:

    parsed.keyBy(x => x._2) // key by product id.
          .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
          .process(new ProcessWindowFunction[
            (Int, String, Int), (Int, String, Int, Int), String, TimeWindow
          ]() {
            override def process(key: String, context: Context,
                                 elements: Iterable[(Int, String, Int)],
                                 out: Collector[(Int, String, Int, Int)]): Unit = {
              val lst = elements.toList
              lst.foreach(x => out.collect((x._1, x._2, x._3, lst.size)))
            }
          })
          .print()