I am providing a Flink SQL interface to users, so I can't really use the Table or Java/Scala interface. Everything needs to be specified in SQL. I can parse comments in the SQL files though, and add specified ad hoc lower level API instructions.
How can one user could convert, say:
SELECT b, AVG(a) "average" FROM source_data GROUP BY b
name: average_source_data_retracting
b STRING
average NUMERIC
-which is retracting values- to a form that would append them. This appending form could have the following schema:
name: average_source_data_appending
flag BOOLEAN <-- indicating an accumulate or retract message
b STRING
average NUMERIC
Aka sort of having the RetractStreamTableSink equivalent to the AppendStreamTableSink, but without it being a sink.
All this to enable the use of average_source_data_appending to create a Temporal table (filtering retract messages), but this kind of table only accepts append-only source tables.
I have considered using windows (as talked about here), but I'd like the updates to the temporal table to be instantaneous.
Please disregard this question, apparently the Temporal Table Functions can accept tables which are (to me) retracting.
Something to the effect of:
SELECT b, AVG(a) "average", MAX(proctime) max_proctime FROM source_data GROUP BY b
Can accepted as a Temporal Table Function with b as a key, and max_proctime as the time attribute. I guess the MAX(proctime) somehow makes it think new lines are emitted, when they are only overwritten? I think I need more time understanding this.
EDIT:
Digging through the source code, we find that Temporal Table Functions seem to accept retracting definitions, but only if it is under processing time:
TemporalProcessTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
if (BaseRowUtil.isAccumulateMsg(element.getValue())) {
rightState.update(element.getValue());
registerProcessingCleanupTimer();
} else {
rightState.clear();
cleanupLastTimer();
}
}
TemporalRowTimeJoinOperator.java:
@Override
public void processElement2(StreamRecord<BaseRow> element) throws Exception {
...
checkNotRetraction(row);
...
}
private void checkNotRetraction(BaseRow row) {
if (BaseRowUtil.isRetractMsg(row)) {
String className = getClass().getSimpleName();
throw new IllegalStateException(
"Retractions are not supported by " + className +
". If this can happen it should be validated during planning!");
}
}
This is undocumented; I don't know if this is permanent, and if the documentation will be updated.