Search code examples
apache-flinkflink-sql

Is there a way to define a Dynamic Table comprised of entries that have NOT been touched by an event recently?


I'm new to Flink and I'm trying to use it to have a bunch of live views of my application. At least one of the dynamic views I'd like to build would be to show entries that have not met an SLA -- or essentially expired -- and the condition for this would be a simple timestamp comparison. So I would basically want an entry to show up in my dynamic table if it has NOT been touched by an event recently. In playing around with Flink 1.6 (constrained to this due to AWS Kinesis) in a dev environment, I'm not seeing that Flink is re-evaluating a condition unless an event touches that entry.

I've got my dev environment plugged into a Kinesis stream that's sending in live access log events from a web server. This isn't my real use case but it was an easy one to begin testing with. I've written a simple table query that pulls in a request path, its last access time, and computes a boolean flag to indicate whether it hasn't been accessed in the last minute. I'm debugging this via a retract stream connected to PrintSinkFunction so all updates/deletes are printed to my console.

tEnv.registerDataStream("AccessLogs", accessLogs, "username, status, request, responseSize, referrer, userAgent, requestTime, ActionTime.rowtime");

Table paths = tEnv.sqlQuery("SELECT request AS path, MAX(requestTime) as lastTime, CASE WHEN MAX(requestTime) < CURRENT_TIMESTAMP - INTERVAL '1' MINUTE THEN 1 ELSE 0 END AS expired FROM AccessLogs GROUP BY request");

DataStream<Tuple2<Boolean, Row>> retractStream = tEnv.toRetractStream(paths, Row.class);
retractStream .addSink(new PrintSinkFunction<>());

I expect that when I access a page, an Add event is sent to this stream. Then if I wait 1 minute (do nothing), the CASE statement in my table will evaluate to 1, so I should see a Delete and then Add event with that flag set.

What I actually see is that nothing happens until I load that page again. The Delete event actually has the flag set, while the Add event that immediate follows that has it cleared again (as it should since it's no longer "expired).

// add/delete, path, lastAccess, expired
(true,/mypage,2019-05-20 20:02:48.0,0) // first page load, add event
(false,/mypage,2019-05-20 20:02:48.0,1) // second load > 2 mins later, remove event for the entry with expired flag set
(true,/mypage,2019-05-20 20:05:01.0,0) // second load, add event

Edit: The most useful tip I've come across in my searching is to create a ProcessFunction. I think this is something I could make work with my dynamic tables (in some cases I'd end up with intermediate streams to look at computed dates), but hopefully it doesn't have to come to that.

I've gotten the ProcessFunction approach to work but it required a lot more tinkering than I initially thought it would:

  1. I had to add a field to my POJO that changes in the onTimer() method (could be a date or a version that you simply bump each time)
  2. I had to register this field as part of the dynamic table
  3. I had to use this field in my query in order for the query to get re-evaluated and change the boolean flag (even though I don't actually use the new field). I just added it as part of my SELECT clause.

Solution

  • Your approach looks promising but a comparison with a moving "now" timestamp is not supported by Flink's Table API / SQL (yet).

    I would solve this in two steps.

    1. register the dynamic table in upsert mode, i.e., a table that is upserted per key (request in your case) based on a version timestamp (requestTime in your case). The resulting dynamic table would hold the latest row for every request.
    2. Have a query with a simple filter predicate like yours that compares the version timestamp of the rows of the dynamic (upsert) table and filters out all rows that have timestamps which are too close to now.

    Unfortunately, neither of both features (upsert conversions and comparisons against the moving "now" timestamp) are available in Flink, yet. There is some ongoing work for upsert table conversions though.