Search code examples
scalaapache-sparkbigdataclouderahortonworks-sandbox

Check file on csv file streaming with scala


Im working with spark streaming and don't want to process the old files when the new streaming file come every 10 minutes:

val val1= spark  
.read //  
.option("header", "true")    
.option("schema", "true")    
.option("sep", ",")    
.csv(path_to_file).toDF().cache()  
val1.registerTempTable("test")

after creating the dataframe i do some transformation and process the checkpoint can help me and how i used in my case


Solution

  • *****************the solution*******************

    val spark = SparkSession .builder .appName("test") .config("spark.local", "local[*]") .getOrCreate() spark.sparkContext.setCheckpointDir(path_checkpoint) and after i call the checkpoint function on the dataframe And i specified a trigger to execute the job

       .writeStream
        .format("csv") 
        .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
        .option("checkpointLocation",CheckPoint)   
     .trigger(Trigger.ProcessingTime("180 seconds")) 
        .option("Path",Path )  
        .option("header", true)  
        .outputMode("Append")
        .queryName("test")
        .start()