Search code examples
apache-flinkflink-streamingwindowingapache-calciteflink-sql

An exponentially decaying moving average over a hopping window in Flink SQL: Casting time


Now we have SQL with fancy windowing in Flink, I'm trying to have the decaying moving average referred by "what will be possible in future Flink releases for both the Table API and SQL." from their SQL roadmap/preview 2017-03 post:

table
  .window(Slide over 1.hour every 1.second as 'w)
  .groupBy('productId, 'w)
  .select(
    'w.end,
    'productId,
    ('unitPrice * ('rowtime - 'w.start).exp() / 1.hour).sum / (('rowtime - 'w.start).exp() / 1.hour).sum)

Here is my attempt (inspired as well by the calcite decaying example):

SELECT                                                                              
  lb_index one_key,                                                           
  HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND) start_time,  
  SUM(Y * 
      EXP(
        proctime - 
        HOP_START(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)
      ))                                                             
FROM write_position                                                                
GROUP BY lb_index, HOP(proctime, INTERVAL '0.05' SECOND, INTERVAL '5' SECOND)

Time is processing time, which we get as proctime with the creation of the write_position from an AppendStream Table as:

tEnv.registerTable(
    "write_position", 
    tEnv.fromDataStream(appendStream, "lb_index, Y, proctime.proctime"))

I'm getting this error:

Cannot apply '-' to arguments of type '<TIME ATTRIBUTE(PROCTIME)> - <TIME ATTRIBUTE(PROCTIME)>'. 
Supported form(s): '<NUMERIC> - <NUMERIC>' '<DATETIME_INTERVAL> - <DATETIME_INTERVAL>' '<DATETIME> - <DATETIME_INTERVAL>'

I've tried casting proctime to every other type I know of (in an attempt to reach the NUMERIC promised land), and I just can't find how to make it work.

Am I missing something? Is proctime some very special kind of 'system change number' time that you can't convert? If so, there still must be some way to compare it to the HOP_START(proctime,...) value.


Solution

  • You can use timestampDiff to subtract two timepoints (see the docs). You use it like this

    TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
    

    where timepointunit can be SECOND, MINUTE, HOUR, DAY, MONTH, or YEAR.

    I haven't tried this with processing time, but it does work with event time fields, so hopefully it will.