I have an Apache Spark batch job running continuously on AWS EMR. It pulls from AWS S3, runs a couple of jobs with that data, and then stores the data in an RDS instance.
However, there seems to be a long period of inactivity between jobs.
This is the CPU use:
And this is the network:
Notice the gap between each column, it is almost the same size as the activity column!
At first I thought these two columns were shifted (when it was pulling from S3, it wasn't using a lot of CPU and vice-versa) but then I noticed that these two graphs actually follow each other. This makes sense since the RDDs are lazy and will thus pull as the job is running.
Which leads to my question, what is Spark doing during that time? All of the Ganglia graphs seem zeroed during that time. It is as if the cluster decided to take a break before each job.
Thanks.
EDIT: Looking at the logs, this is the part where it seems to take an hour of...doing nothing?
15/04/27 01:13:13 INFO storage.DiskBlockManager: Created local directory at /mnt1/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1429892010439_0020/spark-c570e510-934c-4510-a1e5-aa85d407b748
15/04/27 01:13:13 INFO storage.MemoryStore: MemoryStore started with capacity 4.9 GB
15/04/27 01:13:13 INFO netty.NettyBlockTransferService: Server created on 37151
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Trying to register BlockManager
15/04/27 01:13:13 INFO storage.BlockManagerMaster: Registered BlockManager
15/04/27 01:13:13 INFO util.AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@ip-10-0-3-12.ec2.internal:41461/user/HeartbeatReceiver
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 0
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 7
15/04/27 02:30:45 INFO executor.Executor: Running task 77251.0 in stage 0.0 (TID 0)
15/04/27 02:30:45 INFO executor.Executor: Running task 77258.0 in stage 0.0 (TID 7)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 8
15/04/27 02:30:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 8)
15/04/27 02:30:45 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 15
15/04/27 02:30:45 INFO executor.Executor: Running task 7.0 in stage 0.0 (TID 15)
15/04/27 02:30:45 INFO broadcast.TorrentBroadcast: Started reading broadcast variable
Notice at 01:13:13
, it just hangs there until 20:30:45
.
I found the issue. The problem was in the way I was calling pulling from S3.
We have our data in S3 separated by a date pattern as in s3n://bucket/2015/01/03/10/40/actualData.txt
for the data from 2015-01-03 at 10:40
So when we want to run the batch process on the whole set, we call sc.textFiles("s3n://bucket/*/*/*/*/*/*")
.
BUT that is bad. In retrospect, this makes sense; for each star (*), Spark needs to get all of the files in that "directory", and then get all of the files in the directory under that. A single month has about 30 files and each day has 24 files, and each of those has 60. So the above pattern would call a "list files" on each star AND the call list files on the files returned, all the way down to the minutes! This is so that is can eventually get all of the **/acutalData.txt files and then union all of their RDDs.
This, of course, is really slow. So the answer was to build these paths in code (a list of strings for all the dates. In our case, all possible dates can be determined) and reducing them into a comma-separated string that can be passed into textFiles
.
If in your case you can't determine all of the possible paths, consider either restructuring your data or build as much as possible of the paths and only call *
towards the end of the path, or use the AmazonS3Client to get all the keys using the list-objects api (which allows you to get ALL keys in a bucket with a prefix very quickly) and then pass them as comma-separated string into textFiles
. It will still make a list Status
call for each file and it will still be serial, but there will be a lot less calls.
However, all of these solutions just slow down the inevitable; as more and more data gets built, more and more list status calls will be made serially. The root of the issue seems to the that sc.textFiles(s3n://)
pretends that s3 is a file system, which is not. It is a key-value store. Spark (and Hadoop) need a different way of dealing with S3 (and possibly other key-value stores) that don't assume a file system.