I need to save the table result to orc on S3, and this is how I do it:
tEnv.createTemporaryTable("my_output_table", TableDescriptor.forConnector("filesystem")
.schema(outputSchema)
.option("path", s3OutputPath)
.format(FormatDescriptor.forFormat("orc").build())
.build());
finalResultToInsert.executeInsert("my_output_table");
However, during runtime it throws error of
Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'orc' in the classpath.
at org.apache.flink.table.filesystem.FileSystemTableSink.<init>(FileSystemTableSink.java:128) ~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.table.filesystem.FileSystemTableFactory.createDynamicTableSink(FileSystemTableFactory.java:87) ~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:179) ~[flink-table_2.12-1.14.2.jar:1.14.2]
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:394) ~[flink-table_2.12-1.14.2.jar:1.14.2]
......
I have already included the relevant dependency
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc_2.11</artifactId>
<version>1.14.2</version>
</dependency>
And in the generated jar file I can see the flink orc classes:
org/apache/flink/orc/
org/apache/flink/orc/AbstractOrcFileInputFormat.class
org/apache/flink/orc/OrcFileFormatFactory$1.class
org/apache/flink/orc/OrcFilters$LessThanEquals.class
org/apache/flink/orc/AbstractOrcFileInputFormat$OrcVectorizedReader.class
org/apache/flink/orc/OrcFilters$Not.class
org/apache/flink/orc/OrcFileFormatFactory.class
org/apache/flink/orc/OrcColumnarRowSplitReader.class
org/apache/flink/orc/OrcColumnarRowSplitReader$ColumnBatchGenerator.class
org/apache/flink/orc/AbstractOrcFileInputFormat$OrcReaderBatch.class
org/apache/flink/orc/OrcFilters$In.class
......
So I really don't understand why it still can not find it in the classpath.
A side note, in the project I also include flink-avro dependency and if I change the output format from orc to avro it just works fine.
Also, I am running the job on AWS EMR. The EMR release is 6.6.0 which has Flink version of 1.14.2. https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-660-release.html
Could anyone help with that? Thanks a lot!
Ok, looks like I resolved the problem by placing
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-orc_2.11</artifactId>
<version>1.14.2</version>
</dependency>
To the top of the dependencies section in Maven pom, and it worked. I think it is just due to the order of class loading, so put it in front might resolve the conflicts during the class loading for flink-orc dependency since it will be loaded first.