Search code examples
xmlpython-2.7hadoopclouderahadoop-streaming

How to use StreamXmlRecordReader to parse single & multiline xml records within a single file


I have an input file (txt) as below

<a><b><c>val1</c></b></a>||<a><b><c>val2</c></b></a>||<a><b>
<c>val3</c></b></a>||<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

If you observe the input carefully, the xml data record after the third '||' is split across two lines.

I want to use StreamXmlRecordReader of hadoop streaming to parse this file

-inputreader "org.apache.hadoop.streaming.StreamXmlRecordReader,begin=<a>,end=</a>,slowmatch=true

which I am unable to parse the 3rd record.

I am getting the below error

Traceback (most recent call last):
  File "/home/rsome/test/code/m1.py", line 13, in <module>
    root = ET.fromstring(xml_str.getvalue())
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 964, in XML
    return parser.close()
  File "/usr/lib64/python2.6/xml/etree/ElementTree.py", line 1254, in close
    self._parser.Parse("", 1) # end of data
xml.parsers.expat.ExpatError: no element found: line 1, column 18478

I have used slowmatch=true as well but still no luck.

My output is coming as below

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b>
rec::4::mapper1
<c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

My expected output is

$ hdfs dfs -text /poc/testout001/part-*
rec::1::mapper1
<a><b><c>val1</c></b></a>
rec::2::mapper1
<a><b><c>val2</c></b></a>
rec::3::mapper1
<a><b><c>val3</c></b></a>
rec::1::mapper2
<a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>

any help on this would be of great help


Solution

  • Basically, the StreamXmlInputFormat which is the hadoop-streaming's default input format extends KeyValueTextInputFormat which would split lines at new line character (\r\n) which is not expected in my case where my record is split across multiple lines.

    Hence, to overcome this, I have implemented my own input format extending FileInputFormat where I had the liberty to look further the new line chars (\r\n) for my endTag.

    Usage:

    -libjars /path/to/custom-xml-input-format-1.0.0.jar
    -D xmlinput.start="<a>" \
    -D xmlinput.end="</a>" \    
    -inputformat "my.package.CustomXmlInputFormat"
    

    Here's the code I used.

    import java.io.*;
    import java.lang.reflect.*;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.DataOutputBuffer;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.streaming.*;
    
    
    public class CustomXmlInputFormat extends FileInputFormat {
    
      public static final String START_TAG_KEY = "xmlinput.start";
      public static final String END_TAG_KEY = "xmlinput.end";
    
      @SuppressWarnings("unchecked")
      @Override
      public RecordReader<LongWritable, Text> getRecordReader(final InputSplit genericSplit,
                                          JobConf job, Reporter reporter) throws IOException {
          return new XmlRecordReader((FileSplit) genericSplit, job, reporter);
      }
    
    
      public static class XmlRecordReader implements RecordReader<LongWritable, Text> {
    
        private final byte[] endTag;
        private final byte[] startTag;
        private final long start;
        private final long end;
        private final FSDataInputStream fsin;
        private final DataOutputBuffer buffer = new DataOutputBuffer();
        private LongWritable currentKey;
        private Text currentValue;
    
        public XmlRecordReader(FileSplit split, JobConf conf, Reporter reporter) throws IOException {
          startTag = conf.get(START_TAG_KEY).getBytes("UTF-8");
          endTag = conf.get(END_TAG_KEY).getBytes("UTF-8");
    
          start = split.getStart();
          end = start + split.getLength();
          Path file = split.getPath();
          FileSystem fs = file.getFileSystem(conf);
          fsin = fs.open(split.getPath());
          fsin.seek(start);
        }
    
    
        public boolean next(LongWritable key, Text value) throws IOException {
          if (fsin.getPos() < end && readUntilMatch(startTag, false)) {
            try {
              buffer.write(startTag);
              if (readUntilMatch(endTag, true)) {
                key.set(fsin.getPos());
                value.set(buffer.getData(), 0, buffer.getLength());
                return true;
              }
            } finally {
              buffer.reset();
            }
          }
          return false;
        }
    
        public boolean readUntilMatch(byte[] match, boolean withinBlock)
            throws IOException {
          int i = 0;
          while (true) {
            int b = fsin.read();
            if (b == -1) {
              return false;
            }
    
            if (withinBlock && b != (byte) '\r' && b != (byte) '\n') {
              buffer.write(b);
            }
    
            if (b == match[i]) {
              i++;
              if (i >= match.length) {
                return true;
              }
            } else {
              i = 0;
            }
    
            if (!withinBlock && i == 0 && fsin.getPos() >= end) {
              return false;
            }
          }
        }
    
        @Override
        public float getProgress() throws IOException {
          return (fsin.getPos() - start) / (float) (end - start);
        }
    
        @Override
        public synchronized long getPos() throws IOException {
            return fsin.getPos();
        }
    
        @Override
        public LongWritable createKey() {
          return new LongWritable();
        }
    
        @Override
        public Text createValue() {
          return new Text();
        }
    
        @Override
        public synchronized void close() throws IOException {
            fsin.close();
        }
    
      }
    }
    

    Here's my output

    $ hdfs dfs -text /poc/testout001/part-*
    25      <a><b><c>val1</c></b></a>
    52      <a><b><c>val2</c></b></a>
    80      <a><b><c>val3</c></b></a>
    141     <a></b><c>val4-c-1</c><c>val4-c-2</c></b><d>val-d-1</d></a>