Search code examples
scalaapache-sparkspark-structured-streaming

How to continuously monitor a directory by using Spark Structured Streaming


I want spark to continuously monitor a directory and read the CSV files by using spark.readStream as soon as the file appears in that directory.

Please don't include a solution of Spark Streaming. I am looking for a way to do it by using spark structured streaming.


Solution

  • Here is the complete Solution for this use Case:

    If you are running in stand alone mode. You can increase the driver memory as:

    bin/spark-shell --driver-memory 4G
    

    No need to set the executor memory as in Stand Alone mode executor runs within the Driver.

    As Completing the solution of @T.Gaweda, find the solution below:

    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val csvDF = spark
      .readStream
      .option("sep", ";")
      .schema(userSchema)      // Specify schema of the csv files
      .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
    
    csvDf.writeStream.format("console").option("truncate","false").start()
    

    now the spark will continuously monitor the specified directory and as soon as you add any csv file in the directory your DataFrame operation "csvDF" will be executed on that file.

    Note: If you want spark to inferschema you have to first set the following configuration:

    spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")
    

    where spark is your spark session.