Search code examples
pythonxmlsax

Partition large XML files into subfiles in Python using SAX


I need to parse very large XML files (in the range of 3-5GB), which must split into several smaller XML files according to data included in XML nodes.

Each input file includes several hundred thousand <measure> elements, like in this (very) simplified fragment.

    <items>
        <measure code="0810">
            <condition sequ="001" SID="-5041162"/>
            <footnote Id="00550"/>
            <footnote Id="00735"/>
        </measure>
        <measure code="6304">
            <component Id="01" national="1"/>
            <footnote Id="00001"/>
        </measure>
        <measure code="0811">
            <condition sequ="002" SID="-5041356"/>
            <footnote Id="00555"/>
        </measure>
        <measure code="2915">
            <component Id="01" national="0"/>
            <certif SID="-737740"/>
            <certif SID="-737780"/>
        </measure>
    </items>

The content of the actual <measure> elements can be almost any well-formed XML.

I need to do two processes while parsing these files:

  1. Extract information from the content of <measure> elements, and dump it to a MongoDB database (this part is solved...)
  2. Partition the original XML file into, say 100, XML subfiles based on the first two digits of the "code" attribute of each <measure> node. That is, new 100 XML files (named 'part_00.xml' to 'part_99.xml') need to be created and each <measure> element must be appended to the corresponding subfile. I.e. <measure> blocks 1 and 3 in the sample should be copied to 'part_08.xml', block 2 should be copied to 'part_63.xml'...

I'm using SAX to parse the original files, and process 1 above runs nicely. The pure skeleton of the SAX process is:

    import sys
    from xml.sax import ContentHandler
    from xml.sax import make_parser

    class ParseMeasures(ContentHandler):
        code = ''

        def startElement(self, name, attrs):
            if name == 'measure':
                self.code = attrs.get('code')

        def endElement(self, name):
            if name == 'measure':
                print('***Must append <measure> block to file part_{0}.xml'.format(self.code[:2]))

    def main(args):
        handler = ParseMeasures()
        sax_parser = make_parser()
        sax_parser.setContentHandler(handler)
        sax_parser.parse('my_large_xml.file.xml')
        print('Ended')

    if __name__ == '__main__':
        main(sys.argv[1:])

What I would need is to be able to access the whole <measure> XML element, in 'endElement()', to append it to the corresponding subfile.

Is there a way to combine SAX with other XML parsing functionality, that will allow to obtain the whole <measure> XML element in 'endElement()'? (I can handle the creation and management of the subfiles... This is not the problem!)

Or maybe the SAX approach is not the most adequate in this situation, to start with?

The "only" caveat is that the process should handle input files in the range of 3-5GB...


Solution

  • The following is a hybrid solution that uses the built-in SAX parser to generate parsing events, and lxml to build partial trees (<measure> elements only, and only one at at time).

    Once the element has been built, it is serialized by lxml's API for incremental XML generation into different files, depending on the value of @code.

    This code handles any level of nesting inside the <measure> elements, and text values including whitespace. It does not currently handle comments, processing instructions, or namespaces, but support for those could be added.

    Memory consumption should remain low even with large input files. lxml will add some overhead, but the iterative writing support is quite convenient. Doing this completely manually would be faster overall, but also be more involved.

    from xml.sax import ContentHandler, make_parser
    from lxml import etree
    
    class ParseMeasures(ContentHandler):
        def __init__(self):
            self.stack = []
            self.open = False
            self.elem = None
            self.writers = {}
            self.text = []
    
        def _get_writer(self, filename):
            with etree.xmlfile(filename) as xf:
                with xf.element('items'):
                    while True:
                        el = (yield)
                        xf.write(el)
                        xf.flush()  # maybe don't flush *every* write
    
        def _write(self):
            grp = self.elem.attrib['code'][0:2]
    
            if grp in self.writers:
                writer = self.writers[grp]
            else:
                writer = self.writers[grp] = self._get_writer('part_%s.xml' % grp)
                next(writer)        # run up to `yield` and wait
    
            writer.send(self.elem)  # write out current `<measure>`
            self.elem = None
    
        def _add_text(self):
            if self.elem is not None and self.text:
                if self.open:
                    self.elem.text = ''.join(self.text)
                else:
                    self.elem.tail = ''.join(self.text)
                self.text = []
    
        def startElement(self, name, attrib):
            if self.stack or name == 'measure':
                self._add_text()
                self.open = True
                self.elem = etree.Element(name, attrib)
                self.stack.append(self.elem)
                if len(self.stack) > 1:
                    self.stack[-2].append(self.elem)
    
        def characters(self, content):
            if self.elem is not None:
                self.text.append(content)
    
        def endElement(self, name):
            if self.stack:
                self._add_text()
                self.open = False
                self.elem = self.stack.pop()            
                if not self.stack:
                    self._write()
    
        def endDocument(self):
            # clean up
            for writer in self.writers:
                self.writers[writer].close()
    
    
    def main():
        sax_parser = make_parser()
        sax_parser.setContentHandler(ParseMeasures())
        sax_parser.parse(r'test.xml')
    
    if __name__ == '__main__':
        main()
    

    This generates part_08.xml

    <items>
        <measure code="0810">
            <condition sequ="001" SID="-5041162"/>
            <footnote Id="00550"/>
            <footnote Id="00735"/>
        </measure>
        <measure code="0811">
            <condition sequ="002" SID="-5041356"/>
            <footnote Id="00555"/>
        </measure>
    </items>
    

    and part_29.xml

    <items>
        <measure code="2915">
            <component Id="01" national="0"/>
            <certif SID="-737740"/>
            <certif SID="-737780"/>
        </measure>
    </items>
    

    and part_63.xml

    <items>
        <measure code="6304">
            <component Id="01" national="1"/>
            <footnote Id="00001"/>
        </measure>
    </items>