im working with zeppelin ,I read many files from many source in spark streaming like this :
val var1 = spark
.readStream
.schema(var1_raw)
.option("sep", ",")
.option("mode", "PERMISSIVE")
.option("maxFilesPerTrigger", 100)
.option("treatEmptyValuesAsNulls", "true")
.option("newFilesOnly", "true")
.csv(path_var1 )
val chekpoint_var1 = var1
.writeStream
.format("csv")
.option("checkpointLocation", path_checkpoint_var1)
.option("Path",path_checkpoint )
.option("header", true)
.outputMode("Append")
.queryName("var1_backup")
.start().awaitTermination()
val var2 = spark
.readStream
.schema(var2_raw)
.option("sep", ",")
.option("mode", "PERMISSIVE") //
.option("maxFilesPerTrigger", 100)
.option("treatEmptyValuesAsNulls", "true")
.option("newFilesOnly", "true")
.csv(path_var2 )
val chekpoint_var2 = var2
.writeStream
.format("csv")
.option("checkpointLocation", path_checkpoint_var2) //
.option("path",path_checkpoint_2 )
.option("header", true)
.outputMode("Append")
.queryName("var2_backup")
.start().awaitTermination()
when i re run the job i got this message : java.lang.IllegalArgumentException: Cannot start query with name var1_backup as a query with that name is already active
*****************the solution*******************
val spark = SparkSession
.builder
.appName("test")
.config("spark.local", "local[*]")
.getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
and after i call the checkpoint function on the dataframe
*****************the solution*******************
val spark = SparkSession
.builder
.appName("test")
.config("spark.local", "local[*]")
.getOrCreate()
spark.sparkContext.setCheckpointDir(path_checkpoint)
and after i call the checkpoint function on the dataframe