This is my Flink SQL
SELECT t.reportCode FROM query_record_info as t LEFT JOIN credit_report_head as c ON t.reportCode = c.reportCode
when i run it,i got a wrong
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Selected sort key is not a sortable type
at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
at org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:468)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:467)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:467)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:270)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:178)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
at org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:147)
at org.myorg.quickstart.CreditTest.main(CreditTest.java:108)
but if i remove LEFT
from my sql.it's no problem
SELECT t.reportCode FROM query_record_info as t JOIN credit_report_head as c ON t.reportCode = c.reportCode
i'm a starter of flink..Looking forward to your reply
// 创建环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
CreditReport creditReport = JSONUtil.toBean(jsonStr, CreditReport.class);
DataSource<CreditReport> reportDataSource = env.fromElements(creditReport);
//从资源中抽取出,记录信息
FlatMapFunction<CreditReport, QueryRecordInfo> queryRecordInfoFlat = new FlatMapFunction<CreditReport,QueryRecordInfo>() {
@Override
public void flatMap(CreditReport value, Collector<QueryRecordInfo> out) throws Exception {
List<QueryRecordInfo> queryRecordInfos = creditReport.getQueryRecordInfos();
for (QueryRecordInfo queryRecordInfo : queryRecordInfos) {
out.collect(queryRecordInfo);
}
}
};
//从资源中抽取出,记录报告头
FlatMapFunction<CreditReport, CreditReportHead> queryRecordHeaderFlat = new FlatMapFunction<CreditReport,CreditReportHead>() {
@Override
public void flatMap(CreditReport value, Collector<CreditReportHead> out) throws Exception {
out.collect(value.getCreditReportHead());
}
};
DataSet<QueryRecordInfo> records = reportDataSource.flatMap(queryRecordInfoFlat);
tableEnv.createTemporaryView(QueryRecordInfo, records);
DataSet<CreditReportHead> headers = reportDataSource.flatMap(queryRecordHeaderFlat);
tableEnv.createTemporaryView(CreditReportHead ,headers);
Table queryResult = tableEnv.sqlQuery("SELECT t.reportCode as reportCode,reason as reason FROM credit_report_head as t left JOIN query_record_info as c ON t.reportCode = c.reportCode");
DataSet<ReportCode> reportHeadDataSet = tableEnv.toDataSet(queryResult, ReportCode.class);
reportHeadDataSet.print();