Search code examples
hadoopmapreduceapache-pighdfshadoop-yarn

Pig Script Generating Thousands of Maps


For some reason this script is spawning 60,000 map jobs on a small input:

A1 = LOAD '$directory1' USING CustomLoader AS key:chararray;
A = FOREACH A1 GENERATE CustomParser(key) AS key:chararray;

B = LOAD '$filename1' USING PigStorage AS (key:chararray);

result = JOIN A BY key, B BY key USING 'replicated';

directory1 has a few files that make up about 10000 lines of data, and filename1 also has ~10000 lines of data, all of which are essentially short strings. Both the directory and file are stored in HDFS. Neither is particularly large, on the scale of 10s-100s of kilobytes. However, when I run the script in hadoop, it spawns 60,000 map jobs. This leads to lots of other trouble - sometimes the application manager runs out of memory, sometimes it hangs during the shuffle stage, and other out of memory errors of various sorts.

It doesn't seem like it should be creating so many splits for such a small input. I've tried increasing the max.CombinedSplitSize, mapred.min.split.size, and dfs.block.size, but nothing affected the number of maps (which makes sense because it's a small number of small files I'm working with). I could potentially keep increasing the resources thrown at the job, but to some degree those values are out of my control.

It's possibly worth noting that this script works fine locally - it's only once it's running on the actual hadoop cluster and actually reading from HDFS that this issue occurs.

Has anyone else experience a similar issue, and if so, what did you change to fix the problem?


Solution

  • Turns out the problem was in my CustomLoader (something I wouldn't have expected). The loader gets to define its own splits, and was creating a huge number of splits, which translates to a huge number of maps. This custom loader explicitly didn't group splits together (although I think they might not group by default anyway), so even though many of the splits were empty or small they each spawned their own map job. Since the custom loader was being loaded after all of my configuration changes, it overrode the configurations that would have let me group splits.

    For those who are interested, I found the split problem in the subclassed InputFormat class's List<InputSplit> getSplits(final JobContext context) method, which is returned from InputFormat getInputFormat() in the custom subclass of LoadFunc.