Search code examples
hadoopapache-sparkhdfsmapr

How to creating a MapFile with Spark and access it?


I am trying to create a MapFile from a Spark RDD, but can't find enough information. Here are my steps so far:

I started with,

rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

which threw an Exception as the MapFiles must be sorted. So I modified to:

rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

which worked fine and my MapFile was created. So the next step was accessing the file. Using the directory name where parts were created failed saying that it cannot find the data file. Back to Google, I found that in order to access the MapFile parts I needed to use:

Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);

Naively, I ignored the HashPartioner bit and expected that this would find my entry, but no luck. So my next step was to loop over the readers and do a get(..). This solution did work, but it was extremely slow as the files were created by 128 tasks resulting in 128 part files.

So I investigated the importance of HashPartitioner and found that internally it uses it to identify which reader to use, but it seems that Spark is not using the same partitioning logic. So I modified to:

rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

But again the 2 HashPartioner did not match. So the questions part...

  • Is there a way to combine the MapFiles efficiently (as this would ignore the paritioning logic)?
  • MapFileOutputFormat.getReaders(new Path(file), new Configuration()); is very slow. Can I identify the reader more efficiently?
  • I am using MapR-FS as the underlying DFS. Will this be using the same HashParitioner implementation?
  • Is there a way to avoid repartitioning, or should the data be sorted over the whole file? (In contrast to being sorted within the partition)
  • I am also getting an exception _SUCCESS/data does not exist. Do I need to manually delete this file?

Any links about this would be greatly appreciated.

PS. If entries are sorted, then how is it possible to use the HashPartitioner to locate the correct Reader? This would imply that data parts are Hash Partitioned and then Sorted by key. So I also tried rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280)), but again without any luck.


Solution

  • Digging into the issue, I found that the Spark HashPartitioner and Hadoop HashPartitioner have different logic.

    So the "brute force" solution I tried and works is the following.

    Save the MapFile using rdd.repartitionAndSortWithinPArtitions(new org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);

    Lookup using:

    • Reader[] readers = MapFileOutputFormat.getReaders(new Path(file),new Configuration());
    • org.apache.aprk.HashPartitioner p = new org.apache.aprk.HashPartitioner(readers.length);
    • readers[p.getPartition(key)].get(key,val);

    This is "dirty" as the MapFile access is now bound to the Spark partitioner rather than the intuitive Hadoop HashPartitioner. I could implement a Spark partitioner that uses Hadoop HashPartitioner to improve on though.

    This also does not address the problem with slow access to the relatively large number of reducers. I could make this even 'dirtier' by generating the file part number from the partitioner but I am looking for a clean solution, so please post if there is a better approach to this problem.