Search code examples
apache-sparkaccumulo

Why is my executor memory usage stuck at 0?


I have a pretty simple Spark job that looks like this:

JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
JavaPairRDD<Key,Value> indexSrc =
    rawData.filter(new IndexFilter()).cache();
JavaPairRDD<Key,Value> indexEntries =
    indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
JavaPairRDD<Key,Value> reverseIndexEntries =
    indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
JavaPairRDD<Key,Value> dataEntries =
    rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();

dataEntries.union(indexEntries)
  .union(reverseIndexEntries)
  .repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
  .saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
      AccumuloFileOutputFormat.class, conf);

Where Key and Value are Apache Accumulo's Key and Value classes (using the KryoSerializer).

I'm not sure where exactly to put the calls to cache(), or even if they're needed at all. But I'm concerned that my executors don't seem to be using much of the memory that I've allocated to them:

Screenshot showing zero memory used

And the "Storage" page in the application UI is empty.

Am I doing something wrong, or has Spark decided that it can't make this job go any faster by storing my RDDs?


Solution

  • Memory used means memory used for caching.

    In your code you are performing only one action and indexSrc or dataEntries are not used again, so there is not point of caching it.

    To prove it, you can add

    indexSrc.count(); and dataEntries.count(); after declaring them and then check executor/storage page.

    JavaPairRDD<Key,Value> rawData = newAccumuloRDD(...);
    JavaPairRDD<Key,Value> indexSrc = rawData.filter(new IndexFilter()).cache();
    indexSrc.count();
    JavaPairRDD<Key,Value> indexEntries = indexSrc.mapPartitionsToPair(new IndexBuilder(numPartitions));
    JavaPairRDD<Key,Value> reverseIndexEntries = indexSrc.mapPartitionsToPair(new ReverseIndexBuilder(numPartitions));
    JavaPairRDD<Key,Value> dataEntries = rawData.mapPartitionsToPair(new DataBuilder(numPartitions)).cache();
    dataEntries.count();
    
    dataEntries.union(indexEntries)
      .union(reverseIndexEntries)
      .repartitionAndSortWithinPartitions(new PartitionedIndexRDDPartitioner(NUM_BINS))
      .saveAsNewAPIHadoopFile(pidxBulk.toString(), Key.class, Value.class,
          AccumuloFileOutputFormat.class, conf);