Search code examples
apache-flinkflink-sql

Apache Flink SQL InvalidProgramException: Selected sort key is not a sortable type


This is my Flink SQL

SELECT t.reportCode FROM query_record_info as t LEFT JOIN credit_report_head as c ON t.reportCode = c.reportCode

when i run it,i got a wrong

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type
    at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
    at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:468)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:467)
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
    at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:467)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:270)
    at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:178)
    at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
    at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
    at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
    at org.myorg.quickstart.CreditTest.main(CreditTest.java:108)

but if i remove LEFT from my sql.it's no problem

SELECT t.reportCode FROM query_record_info as t JOIN credit_report_head as c ON t.reportCode = c.reportCode

i'm a starter of flink..Looking forward to your reply


Solution

  •     // 创建环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
    
        CreditReport creditReport = JSONUtil.toBean(jsonStr, CreditReport.class);
        DataSource<CreditReport> reportDataSource = env.fromElements(creditReport);
    
    
    
        //从资源中抽取出,记录信息
        FlatMapFunction<CreditReport, QueryRecordInfo> queryRecordInfoFlat = new FlatMapFunction<CreditReport,QueryRecordInfo>() {
            @Override
            public void flatMap(CreditReport value, Collector<QueryRecordInfo> out) throws Exception {
                List<QueryRecordInfo> queryRecordInfos = creditReport.getQueryRecordInfos();
                for (QueryRecordInfo queryRecordInfo : queryRecordInfos) {
                    out.collect(queryRecordInfo);
                }
            }
    
        };
    
        //从资源中抽取出,记录报告头
        FlatMapFunction<CreditReport, CreditReportHead> queryRecordHeaderFlat = new FlatMapFunction<CreditReport,CreditReportHead>() {
            @Override
            public void flatMap(CreditReport value, Collector<CreditReportHead> out) throws Exception {
                out.collect(value.getCreditReportHead());
            }
    
    
        };
    
        DataSet<QueryRecordInfo> records = reportDataSource.flatMap(queryRecordInfoFlat);
    
        tableEnv.createTemporaryView(QueryRecordInfo, records);
        DataSet<CreditReportHead> headers = reportDataSource.flatMap(queryRecordHeaderFlat);
        tableEnv.createTemporaryView(CreditReportHead ,headers);
    
        Table queryResult = tableEnv.sqlQuery("SELECT t.reportCode as reportCode,reason as reason FROM credit_report_head as t left JOIN query_record_info as c ON t.reportCode = c.reportCode");
        DataSet<ReportCode> reportHeadDataSet = tableEnv.toDataSet(queryResult, ReportCode.class);
    
        reportHeadDataSet.print();