Search code examples
apache-flinkflink-table-api

wrong result in Apache flink full outer join


I have 2 data streams which were created from 2 tables like:

Table orderRes1 = ste.sqlQuery(
               "SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
                       " Group by orderId, userId");

Table orderRes2 = ste.sqlQuery(
               "SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
                       " Group by orderId, userId");

DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
               filter(order-> order.f0);

DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
               filter(order-> order.f0);

I wonder to perform a full outer join on these 2 streams, and I used both orderRes1.fullOuterJoin(orderRes2 ,$(exp)) and a sql query containing a full outer join, as below:

  Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
                $("userId"), $("price"));
  
  Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
                $("userId"), $("price"));

 Table result = ste.sqlQuery(
                "SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
                        " COALESCE(bidTbl.userId,askTbl.orderId)," +
                        " COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
                        " COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " + 
                        " FROM " +
                        " (SELECT orderId, userId," +
                        " SUM(price) AS bidTotalPrice " +
                        " FROM " + bidOrdr +
                        " Group by orderId, userId) bidTbl full outer JOIN " +
                        " (SELECT orderId, userId," +
                        " SUM(price) AS askTotalPrice" +
                        " FROM " + askOrdr +
                        " Group by orderId, userId) askTbl " +
                        " ON (bidTbl.orderId = askTbl.orderId" +
                        " AND bidTbl.userId= askTbl.userId) ") ;

 DataStream<Tuple2<Boolean, Row>> =  ste.toRetractStream(result, Row.class).filter(order -> order.f0);

However, the result in some cases in not correct: imagine user A sells with a price to B 3 times, after that user B sells to A 2 times, the second time the result is:

7> (true,123,a,300.0,0.0)

7> (true,123,a,300.0,200.0)

10> (true,123,b,0.0,300.0)

10> (true,123,b,200.0,300.0)

the second and forth lines are the expected result of stream, but it will generate the 1st and 3rd lines too. worth mentioning that coGroup is the other solution, yet I do not want to use windowing in this scenario, and a non-windowing solution is just accessible in bounded streams (DataSet).

Hint: orderId and userId will repeat in both streams, and I want to produce 2 rows in each action, containing: orderId, userId1, bidTotalPrice, askTotalPrice AND orderId, userId2, bidTotalPrice, askTotalPrice


Solution

  • Something like this is to be expected with streaming queries (or in other words, with queries executed on dynamic tables). Unlike a traditional database, where the input relations to a query are kept static during query execution, the inputs to a streaming query are being continuously updated -- and so the result must also be continuously updated.

    If I understand the setup here, the "incorrect" results on lines 1 and 3 are correct up until the relevant rows from orderRes2 are processed. If those rows never arrive, then lines 1 and 3 will remain correct.

    What you should expect is an eventually correct result, including retractions as necessary. You can reduce the number of intermediate results by turning on mini-batch aggregation.

    This mailing list thread gives more insight. If I've misunderstood your situation, please provide a reproducible example that illustrates the problem.