Search code examples
javahadoopmapreducerecordreader

Custom RecordReader initialize not called


I've recently started messing with Hadoop and just created my own inputformat to handle pdf's.

For some reason my custom RecordReader class doesn't have it's initialize method called. (checked it with a sysout, cause I haven't set up a debugging environment)

I'm running hadoop 2.2.0 on windows 7 32bit. Doing my calls with yarn jar, as hadoop jar is bugged under windows...

import ...

public class PDFInputFormat extends FileInputFormat<Text, Text>
{


        @Override
        public RecordReader<Text, Text> getRecordReader(InputSplit arg0,
                JobConf arg1, Reporter arg2) throws IOException 
                {
                    return new PDFRecordReader();
                }

        public static class PDFRecordReader implements RecordReader<Text, Text>
        {

            private FSDataInputStream fileIn;
            public String fileName=null;
            HashSet<String> hset=new HashSet<String>();

            private Text key=null;
            private Text value=null;

            private byte[] output=null;
            private int position = 0;

            @Override
            public Text createValue() {
                int endpos = -1;
                for (int i = position; i < output.length; i++){
                    if (output[i] == (byte) '\n') {
                        endpos = i;
                    }
                }
                if (endpos == -1) {
                    return new Text(Arrays.copyOfRange(output,position,output.length));
                }
                return new Text(Arrays.copyOfRange(output,position,endpos));
            }

            @Override
            public void initialize(InputSplit genericSplit, TaskAttemptContext job) throws
            IOException, InterruptedException
            {
                System.out.println("initialization is called");
                FileSplit split=(FileSplit) genericSplit;
                Configuration conf=job.getConfiguration();

                Path file=split.getPath();
                FileSystem fs=file.getFileSystem(conf);
                fileIn= fs.open(split.getPath());

                fileName=split.getPath().getName().toString();

                System.out.println(fileIn.toString());

                PDDocument docum = PDDocument.load(fileIn);

                ByteArrayOutputStream boss = new ByteArrayOutputStream();
                OutputStreamWriter ow = new OutputStreamWriter(boss);

                PDFTextStripper stripper=new PDFTextStripper();
                stripper.writeText(docum, ow);
                ow.flush();

                output = boss.toByteArray();

            }
        }


}

Solution

  • As I figured it out last night and I might help someone else with this:

    RecordReader is a deprecated interface of Hadoop (hadoop.common.mapred) and it doesn't actually contain an initialize method, which explains why it doesn't get called automatically.

    Extending the RecordReader class in hadoop.common.mapreduce does let you extend the initialize method of that class.