Search code examples
scalaapache-flinkflink-streaming

Read CSV File in Flink as DataStream


I am new to Apache Flink, with version 1.32, I am trying to read a CSV File to Datastream

I was able to read as String,

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}


object AdvCsvRead {

  def main(args: Array[String]): Unit = {

    // set up the execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val path = "src/main/resources/sales_orders.csv"
    val ds: DataStream[String] = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_ONCE, 100)

    ds.print()
    env.execute("AdvCsvRead")
  }

  case class Sales (
                     var ID: Integer,
                     var Customer: String,
                     var Product: String,
                     var Date: String,
                     var Quantity: Integer,
                     var Rate: Double,
                     var Tags: String
                   )


}

Need an example on how to read it as CSV with Scala case class as Datastream, since documentation is limited, need your help!


Solution

  • Here is complete working example = https://github.com/sandeep540/flink-scala3-csv