Search code examples

Apache Flink: LEFT JOIN with a TableFunction does not return expected result

Flink version: 1.3.1

I created two tables, one is from memory, another is from UDTF. When I tested join and left join, they returned the same result. What I expected was left join had more rows than join.

My test code is this:

public class ExerciseUDF {
        public static void main(String[] args) throws Exception {
        public static void test_3() throws Exception {
                // 1. set up execution environment
                ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

                DataSet<WC> input = env.fromElements(
                        new WC("Hello", 1),
                        new WC("Ciao", 1),
                        new WC("Hello", 1));

                // 2. register the DataSet as table "WordCount"
                tEnv.registerDataSet("WordCount", input, "word, frequency");

                Table table;
                DataSet<WC> result;
                        DataSet<WCUpper> resultUpper;
                table = tEnv.scan("WordCount");
                // 3. table left join user defined table
                System.out.println("table left join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, S.word as myupper FROM WordCount as S left join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word");
                resultUpper = tEnv.toDataSet(table, WCUpper.class);
                resultUpper.print(); // out put —— WCUpper Ciao 1 CIAO, however, without the row having Hello

                // 4. table join user defined table
                System.out.println("table join user defined table");
                tEnv.registerFunction("myTableUpperFunc",new MyTableFunc_2());
                table = tEnv.scan("WordCount");
                table = tEnv.sql("SELECT S.word as word, S.frequency as frequency, T.myupper as myupper FROM WordCount as S join LATERAL TABLE(myTableUpperFunc(S.word)) as T(word,myupper) on S.word = T.word"
                resultUpper = tEnv.toDataSet(table, WCUpper.class);

            public static class WC {
                public String word;
                public long frequency;

                // public constructor to make it a Flink POJO
                public WC() {

                public WC(String word, long frequency) {
                    this.word = word;
                    this.frequency = frequency;

                public String toString() {
                    return "WC " + word + " " + frequency;

            // user defined table function
            public static class MyTableFunc_2 extends TableFunction<Tuple2<String,String>>{
                public void eval(String str){ // hello --> hello HELLO
                    System.out.println("upper func executed for "+str);
                    collect(new Tuple2<String,String>(str,str.toUpperCase()));
                    // collect(new Tuple2<String,String>(str,str.toUpperCase()));

The output of the left join and join queries are the same. In both cases only one row is returned.

WCUpper Ciao 1 CIAO

However, I think that the left join query should preserve the 'Hello' rows.


  • Yes, you are right.

    This is a bug in the translation of TableFunction outer joins with predicates and needs to be fixed.

    Thanks, Fabian