Search code examples
javascalaapache-flink

toChanglelogStream prints different kinds of changes


I am reading at https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream,

The EXAMPLE 1:

// === EXAMPLE 1 ===

// interpret the stream as a retract stream

// create a changelog DataStream
val dataStream = env.fromElements(
    Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)),
    Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)),
    Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)),
    Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100))
)(Types.ROW(Types.STRING, Types.INT))


// interpret the DataStream as a Table
val table = tableEnv.fromChangelogStream(dataStream)

// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table)
tableEnv
    .executeSql("SELECT f0 AS name, SUM(f1) AS score FROM InputTable GROUP BY f0")
    .print()

// prints:
// +----+--------------------------------+-------------+
// | op |                           name |       score |
// +----+--------------------------------+-------------+
// | +I |                            Bob |           5 |
// | +I |                          Alice |          12 |
// | -D |                          Alice |          12 |
// | +I |                          Alice |         100 |
// +----+--------------------------------+-------------+

The EXAMPLE 2:

// === EXAMPLE 2 ===

// convert to DataStream in the simplest and most general way possible (no event-time)

val simpleTable = tableEnv
    .fromValues(row("Alice", 12), row("Alice", 2), row("Bob", 12))
    .as("name", "score")
    .groupBy($"name")
    .select($"name", $"score".sum())

tableEnv
    .toChangelogStream(simpleTable)
    .executeAndCollect()
    .foreach(println)

// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]

For the two examples, I would ask why the first one prints -D and +I in the last two records, while the second one prints -U and +U. What's the rule here to determine the kind of change? Thanks.


Solution

  • The reason for the difference has two parts, both of them defined in GroupAggFunction, which is the process function used to process this query.

    The first is this part of the code:

    // update aggregate result and set to the newRow
    if (isAccumulateMsg(input)) {
        // accumulate input
        function.accumulate(input);
    } else {
        // retract input
        function.retract(input);
    }
    

    When a new value is received for a given key, the method first checks if it is an accumulation message (RowKind.INSERT or RowKind.UPDATE_AFTER) or a retract message (RowKind.UPDATE_BEFORE).

    In your first example, you explicitly state the RowKind yourself. When the execution reaches Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", Int.box(12)), which is a retraction message, it will first retract the input from the existing accumulator. This means that after the retraction, we end up with a key which has an empty accumulator. When that happens, the below line is reached:

    } else {
        // we retracted the last record for this key
        // sent out a delete message
        if (!firstRow) {
            // prepare delete message for previous row
            resultRow.replace(currentKey, prevAggValue).setRowKind(RowKind.DELETE);
            out.collect(resultRow);
        }
        // and clear all state
        accState.clear();
        // cleanup dataview under current key
        function.cleanup();
    }
    

    Since this is not the first row received for the key "Alice", we emit a delete message for the previous row, and then the next one will emit an INSERT.

    For your second example where you don't explicitly specify the RowKind, all messages are received with RowKind.INSERT by default. This means that now we don't retract the existing accumulator, and the following code path is taken:

    if (!recordCounter.recordCountIsZero(accumulators)) {
        // we aggregated at least one record for this key
    
        // update the state
        accState.update(accumulators);
    
        // if this was not the first row and we have to emit retractions
        if (!firstRow) {
            if (stateRetentionTime <= 0 && equaliser.equals(prevAggValue, newAggValue)) {
                // newRow is the same as before and state cleaning is not enabled.
                // We do not emit retraction and acc message.
                // If state cleaning is enabled, we have to emit messages to prevent too early
                // state eviction of downstream operators.
                return;
            } else {
                // retract previous result
                if (generateUpdateBefore) {
                    // prepare UPDATE_BEFORE message for previous row
                    resultRow
                            .replace(currentKey, prevAggValue)
                            .setRowKind(RowKind.UPDATE_BEFORE);
                    out.collect(resultRow);
                }
                // prepare UPDATE_AFTER message for new row
                resultRow.replace(currentKey, newAggValue).setRowKind(RowKind.UPDATE_AFTER);
            }
    

    Since the row count is greater than 0 (we didn't retract), and this is not the first row received for the key, and because the AggFunction has set generateUpdateBefore to true, we first receive an UPDATE_BEFORE message (-U) followed immediately by an UPDATE_AFTER (+U).

    All the relevant code can be found here.