Search code examples
unit-testingapache-flink

Add a unit test for Flink SQL


I am using Flink v1.7.1. When I finished a Flink streaming job with tableSource, SQL and tableSink, I have no idea how to add a unit test for it.


Solution

  • I found a good example about how to testing flink sql with the help of user mailing list, here is a example.

    package org.apache.flink.table.runtime.stream.sql;
    
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.api.java.tuple.Tuple5;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.runtime.utils.JavaStreamTestData;
    import org.apache.flink.table.runtime.utils.StreamITCase;
    import org.apache.flink.test.util.AbstractTestBase;
    import org.apache.flink.types.Row;
    
    import org.junit.Test;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * Integration tests for streaming SQL.
     */
    public class JavaSqlITCase extends AbstractTestBase {
    
        @Test
        public void testRowRegisterRowWithNames() throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
            StreamITCase.clear();
    
            List<Row> data = new ArrayList<>();
            data.add(Row.of(1, 1L, "Hi"));
            data.add(Row.of(2, 2L, "Hello"));
            data.add(Row.of(3, 2L, "Hello world"));
    
            TypeInformation<?>[] types = {
                    BasicTypeInfo.INT_TYPE_INFO,
                    BasicTypeInfo.LONG_TYPE_INFO,
                    BasicTypeInfo.STRING_TYPE_INFO};
            String[] names = {"a", "b", "c"};
    
            RowTypeInfo typeInfo = new RowTypeInfo(types, names);
    
            DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
    
            Table in = tableEnv.fromDataStream(ds, "a,b,c");
            tableEnv.registerTable("MyTableRow", in);
    
            String sqlQuery = "SELECT a,c FROM MyTableRow";
            Table result = tableEnv.sqlQuery(sqlQuery);
    
            DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
            resultSet.addSink(new StreamITCase.StringSink<Row>());
            env.execute();
    
            List<String> expected = new ArrayList<>();
            expected.add("1,Hi");
            expected.add("2,Hello");
            expected.add("3,Hello world");
    
            StreamITCase.compareWithList(expected);
        }
    }
    

    the related code is here