Search code examples
scalaapache-flink

Proper syntax for reading a CSV using Apache Flink (Scala)


I need to read a CSV file that has 6 columns, of type Integer, String, String, String, Integer, Integer, respectively. I would like to use Apache Flinks' ExecutionEnvironment.readCsvFile method, but I keep getting typing and argument errors. I currently have:

val env = ExecutionEnvironment.getExecutionEnvironment
val lines = env.readCsvFile[Integer, String, String, String, Integer, Integer]("C:/Users/zoldham/IdeaProjects/flinkpoc/Data/gun-violence-data_01-2013_03-2018.csv")

And get

Error:(43, 32) wrong number of type parameters for method readCsvFile: [T](filePath: String, lineDelimiter: String, fieldDelimiter: String, quoteCharacter: Character, ignoreFirstLine: Boolean, ignoreComments: String, lenient: Boolean, includedFields: Array[Int], pojoFields: Array[String])(implicit evidence$1: scala.reflect.ClassTag[T], implicit evidence$2: org.apache.flink.api.common.typeinfo.TypeInformation[T])org.apache.flink.api.scala.DataSet[T]

Note that these are lines 42 and 43. What would the proper syntax look like? I have been unable to find any examples to use as a baseline for what it should look like. Thanks!


Solution

  • You need to specify a tuple or a case class as input type. Try the following:

    val env = ExecutionEnvironment.getExecutionEnvironment
    val lines = env.readCsvFile[(Integer, String, String, String, Integer, Integer)]("C:/Users/zoldham/IdeaProjects/flinkpoc/Data/gun-violence-data_01-2013_03-2018.csv")