Search code examples
sqlapache-flinkflink-sql

Flink SQL : Outer Join with Group By gives unexpected output


I have two Flink dynamic tables Event and Configuration.

Event have the structure : [id, myTimestamp] and Configuration have the structure : id, myValue, myTimestamp

I am trying to do a Flink SQL query that return Event.id, Configuration.myValue, or Event.id, null If the Event row id do not match any id from Configuration.

Example of expected behavior (Event and Configuration starts empty):

The example must be read as :

[DATA_RECEIVED] => TARGET_TABLE : EXPECTED_OUTPUT

Since the SQL Query is made from a join, it is inserted in an UpsertSink (first value of the output correspond to the upsert boolean)

[myId-1, 10]            => EventTable           : [(true, myId-1, null)]
[myId-1, myValue-A, 15] => ConfigurationTable   : [(false, myId-1, null), (true, myId-1, myValue-A)]
[myId-1, myValue-A, 20] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, myValue-A)]
[myId-1, myValue-B, 25] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, myValue-B)]
[myId-1, 30]            => EventTable           : [(false, myId-1, null), (true, myId-1, myValue-B)]

So I did this query :

SELECT
   Event.id,
   Configuration.myValue
FROM
  (SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id) as Event
    LEFT JOIN
  (SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue) as Configuration
    ON Event.id = Configuration.id
GROUP BY Event.id, Configuration.myValue

Where LATEST_VAL is a UDF that return the myValue associated to MAX(myTimestamp).

But I have behavior that I do not understand. Here are the observed results :

[myId-1, 10]            => EventTable           : [(true, myId-1, null)] // OK
[myId-1, myValue-A, 15] => ConfigurationTable   : [(false, myId-1, null), (true, myId-1, myValue-A)] // OK
[myId-1, myValue-A, 20] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-A)] // NOT OK
[myId-1, myValue-B, 25] => ConfigurationTable   : [(false, myId-1, myValue-A), (true, myId-1, null), (false, myId-1, null), (true, myId-1, myValue-B)] // NOT OK
[myId-1, 30]            => EventTable           : [(false, myId-1, null), (true, myId-1, myValue-B)] // OK

How do you explain the difference between the expected behavior and the observed behavior ? Why is there an extra output (true, myId-1, null), (false, myId-1, null) ?

Is it possible to adapt the SQL query to get the wanted behavior ?

Note :

  • I am using Flink 1.8

Solution

  • I think the one bit you missed is that you actually join two retract streams. Even though your input streams are append only streams, you are performing an aggregations over them in the subqueries which produce retraction.

    Let's first analyze results for the subqueries:

    Subquery 1:

    Query: SELECT id, MAX(myTimestamp) as myTimestamp FROM Event GROUP BY id
    
    Resulting stream:
     (true, myId-1, 10L)
     (false, myId-1, 10L)
     (true, myId-1, 30L)
    

    Subquery 2:

    Query: SELECT id, LATEST_VAL(myValue, myTimestamp) as myValue, MAX(myTimestamp) as myTimestamp FROM Configuration GROUP BY id, myValue
    
    Resulting stream:
     (true, "myId-1", "myValue-A", 15L)
     (false, "myId-1", "myValue-A", 15L)
     (true, "myId-1", "myValue-A", 20L)
     (false, "myId-1", "myValue-A", 20L)
     (true, "myId-1", "myValue-B", 25L)
    

    After that you perform the join and the grouping on top of those two retraction streams. Having that in mind what is actually joined and grouped in your example is:

    [true, myId-1, 10]             : [(true, myId-1, null)]
    [true, myId-1, myValue-A, 15]  : [(false, myId-1, null), (true, myId-1, myValue-A)]
    [false, myId-1, myValue-A, 15] : [(false, myId-1, myValue-A), (true, myId-1, null)]
    [true, myId-1, myValue-A, 20]  : [(false, myId-1, null), (true, myId-1, myValue-A)]
    [false, myId-1, myValue-A, 20] : [(false, myId-1, myValue-A), (true, myId-1, null)]
    [true, myId-1, myValue-B, 25]  : [(false, myId-1, null), (true, myId-1, myValue-B)]
    ...
    

    Overall as far as I can tell it produces correct results. For each input row, the last emitted row represents the most recent value corresponding to the given id.