Search code examples
amazon-ec2amazon-s3apache-sparkhadoop-yarnspark-streaming

Spark Streaming app streams files that have already been streamed


We have a spark streaming app deployed in a YARN ec2 cluster with 1 name node and 2 data nodes. We submit the app with 11 executors with 1 core and 588 MB of RAM each. The app streams from a directory in S3 which is constantly being written; this is the line of code that achieves that:

val ssc = new StreamingContext(sparkConf, Seconds(10))
val lines = ssc.fileStream[LongWritable, Text, TextInputFormat](Settings.S3RequestsHost , (f:Path)=> true, true )
//some maps and other logic here
ssc.start()
ssc.awaitTermination()

The purpose of using fileStream instead of textFileStream is to customize the way that spark handles existing files when the process starts. We want to process just the new files that are added after the process launched and omit the existing ones. We configured a batch duration of 10 seconds.

The process goes fine while we add a small number of files to s3, let's say 4 or 5. We can see in the streaming UI how the stages are executed successfully in the executors, one for each file that is processed. But sometimes when we try to add a larger number of files, we face a strange behavior; the application starts streaming files that have already been streamed.

For example, I add 20 files to s3. The files are processed in 3 batches. The first batch processes 7 files, the second 8 and the third 5. No more files are added to S3 at this point, but spark start repeating these phases endlessly with the same files! Any thoughts what can be causing this?

I've posted a Jira ticket for this issue: https://issues.apache.org/jira/browse/SPARK-3553


Solution

  • Note the sentence "The files must be created in the dataDirectory by atomically moving or renaming them into the data directory" from the Spark Streaming Programming Guide. The entire file must appear all at once, rather than creating the file empty and appending to it.

    One approach is to get cloudberry to put the files somewhere else, and then run a script periodically that either moves or renames the files into the directory you've attached your streaming app to.