I am reading at https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream,
The EXAMPLE 1:
// === EXAMPLE 1 ===
// interpret the stream as a retract stream
// create a changelog DataStream
val dataStream = env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))
// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
.executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
.print()
// prints:
// +----+--------------------------------+-------------+
// | op | name | score |
// +----+--------------------------------+-------------+
// | +I | Bob | 5 |
// | +I | Alice | 12 |
// | -D | Alice | 12 |
// | +I | Alice | 100 |
// +----+--------------------------------+-------------+
The EXAMPLE 2:
// === EXAMPLE 2 ===
// convert to DataStream in the simplest and most general way possible (no event-time)
val simpleTable = tableEnv
.fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
.as("name", "score")
.groupBy($"name")
.select($"name", $"score".sum())
tableEnv
.toChangelogStream(simpleTable)
.executeAndCollect()
.foreach(println)
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
For the two examples, I would ask why the first one prints -D
and +I
in the last two records, while the second one prints -U
and +U
. What's the rule here to determine the kind of change? Thanks.
The reason for the difference has two parts, both of them defined in GroupAggFunction
, which is the process function used to process this query.
The first is this part of the code:
// update aggregate result and set to the newRow
if (isAccumulateMsg(input)) {
// accumulate input
function.accumulate(input);
} else {
// retract input
function.retract(input);
}
When a new value is received for a given key, the method first checks if it is an accumulation message (RowKind.INSERT
or RowKind.UPDATE_AFTER
) or a retract message (RowKind.UPDATE_BEFORE
).
In your first example, you explicitly state the RowKind
yourself. When the execution reaches Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12))
, which is a retraction message, it will first retract the input from the existing accumulator. This means that after the retraction, we end up with a key which has an empty accumulator. When that happens, the below line is reached:
} else {
// we retracted the last record for this key
// sent out a delete message
if (!firstRow) {
// prepare delete message for previous row
resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
out.collect(resultRow);
}
// and clear all state
accState.clear();
// cleanup dataview under current key
function.cleanup();
}
Since this is not the first row received for the key "Alice", we emit a delete message for the previous row, and then the next one will emit an INSERT
.
For your second example where you don't explicitly specify the RowKind
, all messages are received with RowKind.INSERT
by default. This means that now we don't retract the existing accumulator, and the following code path is taken:
if (!recordCounter.recordCountIsZero(accumulators)) {
// we aggregated at least one record for this key
// update the state
accState.update(accumulators);
// if this was not the first row and we have to emit retractions
if (!firstRow) {
if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
// newRow is the same as before and state cleaning is not enabled.
// We do not emit retraction and acc message.
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return;
} else {
// retract previous result
if (generateUpdateBefore) {
// prepare UPDATE_BEFORE message for previous row
resultRow
.replace(currentKey, prevAggValue)
.setRowKind(RowKind.UPDATE_BEFORE);
out.collect(resultRow);
}
// prepare UPDATE_AFTER message for new row
resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
}
Since the row count is greater than 0 (we didn't retract), and this is not the first row received for the key, and because the AggFunction
has set generateUpdateBefore
to true
, we first receive an UPDATE_BEFORE
message (-U
) followed immediately by an UPDATE_AFTER
(+U
).
All the relevant code can be found here.