Search code examples
hadoopmapreducehadoop2

How to remove combiner output and keep only reducer output in mapreduce final output


Hi i am running an application which reads records from HBase and writes into text files .

I have used combiner in my application and custom partitioner also. I have used 41 reducer in my application because i need to create 40 reducer output file that satisfies my condition in custom partitioner.

All working fine but when i use combiner in my application it creates map output file per regions or per mapper .

Foe example i have 40 regions in my application so 40 mapper getting initiated then it create 40 map-output files . But reducer is not able to combine all map-output and generate final reducer output file that will be 40 reducer output files.

Data in the files are correct but no of files has increased .

Any idea how can i get only reducer output files.

import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;

public class CommonCombiner extends Reducer<NullWritable, Text, NullWritable, Text> {

    private Logger logger = Logger.getLogger(CommonCombiner.class);
    private MultipleOutputs<NullWritable, Text> multipleOutputs;
    String strName = "";
    private static final String DATA_SEPERATOR = "\\|\\!\\|";

    public void setup(Context context) {
        logger.info("Inside Combiner.");
        multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
    }

    @Override
    public void reduce(NullWritable Key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {

        for (Text value : values) {
            final String valueStr = value.toString();
            StringBuilder sb = new StringBuilder();
            if ("".equals(strName) && strName.length() == 0) {
                String[] strArrFileName = valueStr.split(DATA_SEPERATOR);
                String strFullFileName[] = strArrFileName[1].split("\\|\\^\\|");

                strName = strFullFileName[strFullFileName.length - 1];


                String strArrvalueStr[] = valueStr.split(DATA_SEPERATOR);
                if (!strArrvalueStr[0].contains(HbaseBulkLoadMapperConstants.FF_ACTION)) {
                    sb.append(strArrvalueStr[0] + "|!|");
                }
                multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);
                context.getCounter(Counters.FILE_DATA_COUNTER).increment(1);


            }

        }
    }


    public void cleanup(Context context) throws IOException, InterruptedException {
        multipleOutputs.close();
    }
}

Solution

  • You aren't outputting any data from your combiner for the reducer to work with. In your combiner you're using:

    multipleOutputs.write(NullWritable.get(), new Text(sb.toString()), strName);

    Which isn't how you write data out for use between stages, ie from a mapper or combiner to the reduce phase. You should be using:

    context.write()

    MultipleOutputs is just a way to write extra files to disk where you need more than one. I've never seen it used in a combiner.