Search code examples
apache-flinkflink-streamingflink-sql

Optimized Top-N query using Flink SQL


I'm trying to run a streaming top-n query using Flink SQL but can't get the "optimized version" outlined in the Flink docs working. The setting is as follows:

I've got a Kafka topic where each record contains a tuple (GUID, reached score, maximum possible score). Think of them like a student taking an assessment and the tuple represents how many points he achieved.

What I want to get is a list of the five GUIDs with the highest score measured as a percentage (i.e. sorted by SUM(reached_score) / SUM(maximum possible score)).

I started by aggregating the scores and grouping them by GUID:

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);

Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);

Table aggregatedScores = tableEnv.sqlQuery(
        "SELECT " +
        "  guid, " +
        "  SUM(reached_score) as reached_score, " +
        "  SUM(max_score) as max_score, " +
        "  SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
        "FROM scores " +
        "GROUP BY guid");

tableEnv.registerTable("agg_scores", aggregatedScores);

The resulting table contains an unsorted list of aggregated scores. I then tried to feed it into the Top-N query as it is used in the Flink documentation:

Table topN = tableEnv.sqlQuery(
        "SELECT guid, reached_score, max_score, score, row_num " +
        "FROM (" +
        "   SELECT *," +
        "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
        "   FROM agg_scores)" +
        "WHERE row_num <= 5");


tableEnv.toRetractStream(topN, Row.class).print();

Running this query runs about as expected and results in multiple updates if the order of the elements changes.

// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)

// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)

I then followed the advice from the docs and removed the row_number from the projection:

Table topN = tableEnv.sqlQuery(
    "SELECT guid, reached_score, max_score, score " +
    "FROM (" +
    "   SELECT *," +
    "       ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
    "   FROM agg_scores)" +
    "WHERE row_num <= 5");

Running a similar dataset:

// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)

// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)

What I don't understand is:

  • why the first entry (63992935-9684-4285-8c2b-1fd57b51b48f) is removed and added again / still touched at all
  • why the second entry gets added first (a second time) and then removed. Wouldn't this result in it being technically removed from the data stream?

Both are obviously related to the order of the sorting changing, but isn't this what the optimized top-n query (written further down in the documentation) is supposed to solve?


Solution

  • I've checked this issue and can also reproduce in my local env. I also did some investigation and the reason for this is:

    "we didn't do such optimization for some scenarios, and your case seems to be one of them".

    However, according to the user document, I think it's valid request to include such optimization in your scenario also. It looks like a BUG to me, we claimed some optimizations but doesn't work out.

    I've create an issue: https://issues.apache.org/jira/browse/FLINK-15497 to track this, hopefully we can fix it in up coming 1.9.2 and 1.10.0 version.

    Thanks for reporting this.