Search code examples
hadoopmapreducemultipleoutputs

Issue while Writing Multiple O/P Files in MapReduce


I have a requirement to split my input file into 2 output file based on a filter condition. My output directory should looks like below:

/hdfs/base/dir/matched/YYYY/MM/DD
/hdfs/base/dir/notmatched/YYYY/MM/DD

I am using MultipleOutputs class to split my data in my map function. In my driver class I am using like below:

FileOutputFormat.setOutputPath(job, new Path("/hdfs/base/dir"));

and in Mapper I am using below:

mos.write(key, value, fileName); // File Name is generating based on filter criteria

This program is working fine for a single day. But in second day my program is failing saying that:

Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://nameservice1/hdfs/base/dir already exists

I cannot use different base directory for the second day.

How can I handle this situation?

Note: I don't want to read the input twise to create 2 separate file.


Solution

  • Create Custom o/p format class like below

    package com.visa.util;
    
    import java.io.IOException;
    
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.OutputCommitter;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
    
    public class CostomOutputFormat<K, V> extends SequenceFileOutputFormat<K, V>{
    
        @Override
        public void checkOutputSpecs(JobContext arg0) throws IOException {
        }
    
        @Override
        public OutputCommitter getOutputCommitter(TaskAttemptContext arg0) throws IOException {
            return super.getOutputCommitter(arg0);
        }
    
        @Override
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext arg0) throws IOException, InterruptedException {
            return super.getRecordWriter(arg0);
        }
    
    }
    

    and use it in driver class:

    job.setOutputFormatClass(CostomOutputFormat.class);
    

    Which will skip checking of the existence of o/p directory.