Search code examples
apache-flinkflink-streamingstream-processingapache-calciteflink-sql

Flink: Convert a retracting SQL to an appending SQL, using only SQL, to feed a temporal table


I am providing a Flink SQL interface to users, so I can't really use the Table or Java/Scala interface. Everything needs to be specified in SQL. I can parse comments in the SQL files though, and add specified ad hoc lower level API instructions.

How can one user could convert, say:

SELECT b, AVG(a) "average" FROM source_data GROUP BY b

name: average_source_data_retracting
  b STRING
  average NUMERIC

-which is retracting values- to a form that would append them. This appending form could have the following schema:

name: average_source_data_appending
  flag BOOLEAN <-- indicating an accumulate or retract message
  b STRING
  average NUMERIC

Aka sort of having the RetractStreamTableSink equivalent to the AppendStreamTableSink, but without it being a sink.

All this to enable the use of average_source_data_appending to create a Temporal table (filtering retract messages), but this kind of table only accepts append-only source tables.

I have considered using windows (as talked about here), but I'd like the updates to the temporal table to be instantaneous.


Solution

  • Please disregard this question, apparently the Temporal Table Functions can accept tables which are (to me) retracting.

    Something to the effect of:

    SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b
    

    Can accepted as a Temporal Table Function with b as a key, and max_proctime as the time attribute. I guess the MAX(proctime) somehow makes it think new lines are emitted, when they are only overwritten? I think I need more time understanding this.

    EDIT:

    Digging through the source code, we find that Temporal Table Functions seem to accept retracting definitions, but only if it is under processing time:

    TemporalProcessTimeJoinOperator.java:

    @Override
    public void processElement2(StreamRecord<BaseRow> element) throws Exception {
        if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
            rightState.update(element.getValue());
            registerProcessingCleanupTimer();
        } else {
            rightState.clear();
            cleanupLastTimer();
        }
    }
    

    TemporalRowTimeJoinOperator.java:

    @Override
    public void processElement2(StreamRecord<BaseRow> element) throws Exception {
        ...
        checkNotRetraction(row);
        ...
    }
    private void checkNotRetraction(BaseRow row) {
        if (BaseRowUtil.isRetractMsg(row)) {
            String className = getClass().getSimpleName();
            throw new IllegalStateException(
                "Retractions are not supported by " + className +
                    ". If this can happen it should be validated during planning!");
        }
    }
    

    This is undocumented; I don't know if this is permanent, and if the documentation will be updated.