I'm doing a simple tutorial with flink
+ java
using Table API
. What I want to do is really simple - I want to read a csv file from a local filesystem, using a schema and print it out.
The way I'm doing this is this(the code below is compiled from samples from Flink's website tutorial section):
package p1;
import org.apache.flink.table.api.*;
import org.apache.flink.api.java.utils.ParameterTool;
public class CabAggregation {
public static void main(String[] args) throws Exception {
ParameterTool params = ParameterTool.fromArgs(args);
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
final Schema schema = Schema.newBuilder()
.column("cab_id", DataTypes.INT())
.column("cab_plate", DataTypes.STRING())
.column("cab_make", DataTypes.STRING())
.column("cab_driver", DataTypes.STRING())
.column("active_trip", DataTypes.STRING())
.column("pickup_location", DataTypes.STRING())
.column("target_location", DataTypes.STRING())
.column("num_pass", DataTypes.INT())
.build();
tableEnv.createTemporaryTable("cabs",
TableDescriptor
.forConnector("filesystem")
.schema(schema)
.option("path", "file:///Users/virtual/Downloads/cabs.csv")
.format(FormatDescriptor.forFormat("csv").build())
.build());
Table result = tableEnv.from("cabs").select("*");
result.execute().print();
}
}
Running this gives me this:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'filesystem' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
Available factory identifiers are:
blackhole
datagen
print
Now, it seems evident that somehow CSV is not available as a factory identifier. I can't figure out why.
I'm building the project with maven
.
You'll be needing these dependencies. Have you added them?
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>