I am trying to create a MapFile from a Spark RDD, but can't find enough information. Here are my steps so far:
I started with,
rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
which threw an Exception as the MapFiles
must be sorted.
So I modified to:
rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
which worked fine and my MapFile was created. So the next step was accessing the file. Using the directory name where parts
were created failed saying that it cannot find the data
file. Back to Google, I found that in order to access the MapFile
parts I needed to use:
Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);
Naively, I ignored the HashPartioner
bit and expected that this would find my entry, but no luck. So my next step was to loop over the readers and do a get(..)
. This solution did work, but it was extremely slow as the files were created by 128 tasks resulting in 128 part
files.
So I investigated the importance of HashPartitioner
and found that internally it uses it to identify which reader to use, but it seems that Spark is not using the same partitioning logic. So I modified to:
rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)
But again the 2 HashPartioner
did not match. So the questions part...
MapFiles
efficiently (as this would ignore the paritioning logic)?MapFileOutputFormat.getReaders(new Path(file), new
Configuration());
is very slow. Can I identify the reader more
efficiently?HashParitioner
implementation?_SUCCESS/data does not exist
. Do I need to manually delete this file?Any links about this would be greatly appreciated.
PS. If entries are sorted, then how is it possible to use the HashPartitioner
to locate the correct Reader
? This would imply that data parts
are Hash Partitioned
and then Sorted by key. So I also tried rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))
, but again without any luck.
Digging into the issue, I found that the Spark HashPartitioner and Hadoop HashPartitioner have different logic.
So the "brute force" solution I tried and works is the following.
Save the MapFile using rdd.repartitionAndSortWithinPArtitions(new
org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);
Lookup using:
This is "dirty" as the MapFile access is now bound to the Spark partitioner rather than the intuitive Hadoop HashPartitioner. I could implement a Spark partitioner that uses Hadoop HashPartitioner
to improve on though.
This also does not address the problem with slow access to the relatively large number of reducers. I could make this even 'dirtier' by generating the file part number from the partitioner but I am looking for a clean solution, so please post if there is a better approach to this problem.