I am new to Apache Flink, with version 1.32, I am trying to read a CSV File to Datastream
I was able to read as String,
import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
object AdvCsvRead {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = "src/main/resources/sales_orders.csv"
val ds: DataStream[String] = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_ONCE, 100)
ds.print()
env.execute("AdvCsvRead")
}
case class Sales (
var ID: Integer,
var Customer: String,
var Product: String,
var Date: String,
var Quantity: Integer,
var Rate: Double,
var Tags: String
)
}
Need an example on how to read it as CSV with Scala case class as Datastream, since documentation is limited, need your help!
Here is complete working example = https://github.com/sandeep540/flink-scala3-csv