I started doing the amp-camp 5 exercises. I tried the following 2 scenarios:
Scenario #1
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.checkpoint
pagecounts.count
Scenario #2
val pagecounts = sc.textFile("data/pagecounts")
pagecounts.count
The total time show in the Spark shell Application UI was different for both
scenarios.
Scenario #1 took 0.5 seconds, while scenario #2 took only 0.2
s
In scenario #1, checkpoint command does nothing, it's neither a transformation nor an action. It's saying that once the RDD materializes after an action, go ahead and save to disk. Am I missing something here?
Questions:
I understand that scenario #1 is taking more time, because the RDD is
check-pointed (written to disk). Is there a way I can know the time taken
for checkpoint, from the total time?
The Spark shell Application UI shows the following - Scheduler delay, Task
Deserialization time, GC time, Result serialization time, getting result
time. But, doesn't show the breakdown for checkpointing.
Is there a way to access the above metrics e.g. scheduler delay, GC time and save them programmatically? I want to log some of the above metrics for every action invoked on an RDD.
How can I programmatically access the following information:
Please let me know if you need more information.
Spark REST API provides almost all you are asking for.
Some examples;
How much percentage of an RDD is in memory currently?
GET /api/v1/applications/[app-id]/storage/rdd/0
will be responded with:
{
"id" : 0,
"name" : "ParallelCollectionRDD",
"numPartitions" : 2,
"numCachedPartitions" : 2,
"storageLevel" : "Memory Deserialized 1x Replicated",
"memoryUsed" : 28000032,
"diskUsed" : 0,
"dataDistribution" : [ {
"address" : "localhost:54984",
"memoryUsed" : 28000032,
"memoryRemaining" : 527755733,
"diskUsed" : 0
} ],
"partitions" : [ {
"blockName" : "rdd_0_0",
"storageLevel" : "Memory Deserialized 1x Replicated",
"memoryUsed" : 14000016,
"diskUsed" : 0,
"executors" : [ "localhost:54984" ]
}, {
"blockName" : "rdd_0_1",
"storageLevel" : "Memory Deserialized 1x Replicated",
"memoryUsed" : 14000016,
"diskUsed" : 0,
"executors" : [ "localhost:54984" ]
} ]
}
Overall time taken for computing an RDD?
To compute an RDD is also called either Job, stage, or attempt.
GET /applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary
will be responded with:
{
"quantiles" : [ 0.05, 0.25, 0.5, 0.75, 0.95 ],
"executorDeserializeTime" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
"executorRunTime" : [ 3.0, 3.0, 4.0, 4.0, 4.0 ],
"resultSize" : [ 1457.0, 1457.0, 1457.0, 1457.0, 1457.0 ],
"jvmGcTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"shuffleReadMetrics" : {
"readBytes" : [ 340.0, 340.0, 342.0, 342.0, 342.0 ],
"readRecords" : [ 10.0, 10.0, 10.0, 10.0, 10.0 ],
"remoteBlocksFetched" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"localBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ],
"fetchWaitTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"remoteBytesRead" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"totalBlocksFetched" : [ 2.0, 2.0, 2.0, 2.0, 2.0 ]
}
}
Your question is too broad, hence I will not respond to all. I believe everything spark has to reflect is reflected with the REST API.