Search code examples
transactionsplonedata-migrationzodbtransmogrifier

How to commit subtransactions to the ZODB when running a transmogrifier pipeline?


I'm importing content to Plone using a transmogrifier pipeline but, from time to time something can go wrong and an exception could raise. When that happens I need to restart the whole process again because the transaction is completely aborted. This is simply not acceptable with a batch of more than 100,000 items.

I'm using the collective.transmogrifier.sections.savepoint section, but that only sets a savepoint by using transaction.savepoint(optimistic=True).

I want to know if a new section with something like this will help me solve the problem:

from zope.interface import classProvides, implements
from collective.transmogrifier.interfaces import ISectionBlueprint
from collective.transmogrifier.interfaces import ISection
import transaction

class CommitSection(object):
    classProvides(ISectionBlueprint)
    implements(ISection)

    def __init__(self, transmogrifier, name, options, previous):
        self.every = int(options.get('every', 1000))
        self.previous = previous

    def __iter__(self):
        count = 0
        for item in self.previous:
            count = (count + 1) % self.every
            if count == 0:
                transaction.commit()
            yield item

Solution

  • I think so, the code below is from collective.jsonmigrator.partialcommit, which always worked pretty well for me IIRC.
    You could always import collective.jsonmigrator & use that blueprint of course instead of coding your own.

    class PartialCommit(object):
    
        classProvides(ISectionBlueprint)
        implements(ISection)
    
        def __init__(self, transmogrifier, name, options, previous):
            self.previous = previous
            self.step = int(options.get('every', 100))
    
        def __iter__(self):
            count = 1
            for item in self.previous:
                yield item
                if count % self.step == 0:
                    transaction.commit()
                count += 1