I'm trying to run a streaming top-n query using Flink SQL but can't get the "optimized version" outlined in the Flink docs working. The setting is as follows:
I've got a Kafka topic where each record contains a tuple (GUID, reached score, maximum possible score). Think of them like a student taking an assessment and the tuple represents how many points he achieved.
What I want to get is a list of the five GUIDs with the highest score measured as a percentage (i.e. sorted by SUM(reached_score) / SUM(maximum possible score)).
I started by aggregating the scores and grouping them by GUID:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
Table scores = tableEnv.fromDataStream(/* stream from kafka */, "guid, reached_score, max_score");
tableEnv.registerTable("scores", scores);
Table aggregatedScores = tableEnv.sqlQuery(
"SELECT " +
" guid, " +
" SUM(reached_score) as reached_score, " +
" SUM(max_score) as max_score, " +
" SUM(reached_score) / CAST(SUM(max_score) AS DOUBLE) as score " +
"FROM scores " +
"GROUP BY guid");
tableEnv.registerTable("agg_scores", aggregatedScores);
The resulting table contains an unsorted list of aggregated scores. I then tried to feed it into the Top-N query as it is used in the Flink documentation:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score, row_num " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
tableEnv.toRetractStream(topN, Row.class).print();
Running this query runs about as expected and results in multiple updates if the order of the elements changes.
// add first entry
6> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
// add a second entry with lower score below the first one
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
// update the second entry with a much higher score
8> (false,d7847f58-a4d9-40f8-a38d-161821b48481,67,200,0.335,2)
1> (true,d7847f58-a4d9-40f8-a38d-161821b48481,229,400,0.5725,1)
3> (true,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,2)
2> (false,63992935-9684-4285-8c2b-1fd57b51b48f,97,200,0.485,1)
I then followed the advice from the docs and removed the row_number from the projection:
Table topN = tableEnv.sqlQuery(
"SELECT guid, reached_score, max_score, score " +
"FROM (" +
" SELECT *," +
" ROW_NUMBER() OVER (ORDER BY score DESC) as row_num" +
" FROM agg_scores)" +
"WHERE row_num <= 5");
Running a similar dataset:
// add first entry
4> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56)
// add a second entry with lower score below the first one
5> (true,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
// update the second entry with a much higher score
7> (true,d7847f58-a4d9-40f8-a38d-161821b48481,354,400,0.885)
1> (true,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
8> (false,63992935-9684-4285-8c2b-1fd57b51b48f,112,200,0.56) <-- ???
6> (false,d7847f58-a4d9-40f8-a38d-161821b48481,76,200,0.38)
What I don't understand is:
63992935-9684-4285-8c2b-1fd57b51b48f
) is removed and added again / still touched at allBoth are obviously related to the order of the sorting changing, but isn't this what the optimized top-n query (written further down in the documentation) is supposed to solve?
I've checked this issue and can also reproduce in my local env. I also did some investigation and the reason for this is:
"we didn't do such optimization for some scenarios, and your case seems to be one of them".
However, according to the user document, I think it's valid request to include such optimization in your scenario also. It looks like a BUG to me, we claimed some optimizations but doesn't work out.
I've create an issue: https://issues.apache.org/jira/browse/FLINK-15497 to track this, hopefully we can fix it in up coming 1.9.2 and 1.10.0 version.
Thanks for reporting this.