Search code examples
pythonmultiprocessingpicklespacypython-decorators

Serializing custom function in spacy for multiprocessing


I have a custom entity scrubber function that basically tells textrank algorithm(using this to extract key phrases) to avoid tagging certain entities as key phrases. I register this function to spacy using:

@spacy.registry.misc("entity_scrubber")
def articles_scrubber():
    def scrubber_func(span: Span) -> str:
        for token in span:
            if token.ent_type_ in ['CARDINAL', 'DATE', 'MONEY', 'ORDINAL', 'PERCENT',
                                 'PERSON', 'QUANTITY', 'TIME']: # people, places, dates 
                # ignore named entities
                return "INELIGIBLE_PHRASE"
        return span.text
    return scrubber_func

I add the textrank component using the following line of code and also pass the custom function in the config parameter:

nlp.add_pipe("textrank", config={"scrubber": {"@misc": "entity_scrubber"}})

I try to process the docs using the nlp.pipe method and spacy's multiprocessing capability by passing n_process param

nlp.pipe(docs, n_process=8, disable=['tok2vec','tagger','lemmatizer', 'attribute_ruler'])

But I get the following error:

AttributeError: Can't pickle local object 'articles_scrubber.<locals>.scrubber_func'

After exploring online, I found that decorating the function in this way(nested functions) won't be pickable. I have found some solutions online which(http://gael-varoquaux.info/programming/decoration-in-python-done-right-decorating-and-pickling.html, https://towardsdatascience.com/why-you-should-wrap-decorators-in-python-5ac3676835f9) show how to decorate the functions right away so that they are pickable, but I couldn't find any approach in reference to spacy.

Can someone point me in a direction on how can I approach this?


Solution

  • The problem has nothing to do with the function being decorated but rather that the actual worker function that will be invoked in the new process is not at global scope. In the following demo I have a decorator time_it that prints out the running time of the function being decorated. I use it to decorate both the function get_worker_function that returns the worker function that will run in a child process as well as to decorate that worker function, foo, itself. There is no problem when the worker function is at global scope:

    from timing import time_it
    from multiprocessing import Process
    
    @time_it
    def foo():
        print("It works!")
    
    @time_it
    def get_worker_function():
        # Return actual worker function
        return foo
    
    if __name__ == '__main__':
        worker_function = get_worker_function()
        p = Process(target=worker_function)
        p.start()
        p.join()
    

    Prints:

    func: get_worker_function args: [(), {}] took: 6e-07 sec.
    It works!
    func: foo args: [(), {}] took: 0.0001581 sec.
    

    But if the worker function is not at global scope, then you get an error:

    from timing import time_it
    from multiprocessing import Process
    
    
    @time_it
    def get_worker_function():
        # Return actual worker function
        def foo():
            print("It works!")
        return foo
    
    if __name__ == '__main__':
        worker_function = get_worker_function()
        p = Process(target=worker_function)
        p.start()
        p.join()
    

    Prints:

    func: get_worker_function args: [(), {}] took: 7e-07 sec.
    Traceback (most recent call last):
      File "C:\Booboo\test\test.py", line 15, in <module>
        p.start()
      File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 121, in start
        self._popen = self._Popen(self)
      File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 224, in _Popen
        return _default_context.get_context().Process._Popen(process_obj)
      File "C:\Program Files\Python38\lib\multiprocessing\context.py", line 327, in _Popen
        return Popen(process_obj)
      File "C:\Program Files\Python38\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
        reduction.dump(process_obj, to_child)
      File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 60, in dump
        ForkingPickler(file, protocol).dump(obj)
    AttributeError: Can't pickle local object 'get_worker_function.<locals>.foo'
    

    Solution?

    Try changing your code to this:

    def scrubber_func(span: Span) -> str:
        for token in span:
            if token.ent_type_ in ['CARDINAL', 'DATE', 'MONEY', 'ORDINAL', 'PERCENT',
                                 'PERSON', 'QUANTITY', 'TIME']: # people, places, dates 
                # ignore named entities
                return "INELIGIBLE_PHRASE"
        return span.text
    
    @spacy.registry.misc("entity_scrubber")
    def articles_scrubber():
        return scrubber_func