Search code examples
scalaavroapache-flink

Avro support in Flink - scala


How to read avro from Flink in scala?

Is it the same for batch/stream/table: StreamExecutionEnvironment/ ExecutionEnvironment / TableEnvironment?

would it be sth like: val custTS: TableSource = new AvroInputFormat("/path/to/file", ...)

Below is java avro implementation ref (connectors), but can't find scala ref anywhere:

  AvroInputFormat<User> users = new AvroInputFormat<User>(in, User.class);
  DataSet<User> usersDS = env.createInput(users);

Solution

  • You can use Flink's InputFormats, including the AvroInputFormat, from the Java as well as the Scala API:

    • Streaming & batch: val avroInputStream = env.createInput(new AvroInputFormat[User](in, classOf[User]))
    • Table API: tableEnv.registerTable("table", avroInputStream.toTable)