Search code examples
pythondictionarymultiprocessingpython-multiprocessingmultiprocessing-manager

Modify a list in Multiprocessing pool's manager dict


I have a list of elements which I am processing in a multiprocessing apply_async task and updating elements processed one by one with a key in manager dict on which I want to map whole list.

I tried following code:

#!/usr/bin/python

from multiprocessing import Pool, Manager

def spammer_task(d, my_list):
    #Initialize manager dict
    d['task'] = {
        'processed_list': []
    }

    for ele in my_list:
        #process here
        d['task']['processed_list'].append(ele)

    return

p = Pool()
m = Manager()
d = m.dict()

my_list = ["one", "two", "three"]

p.apply_async(spammer_task (d, my_list))
print d

At the end it simply posts empty list in dict. Output:

{'task': {'processed_list': []}}

Now after researching a bit, I got to know that elements inside manager dict become immutable so you have to re-initialize whole dict with new data in order to update it. SO i tried following code and it gives a weird error.

#!/usr/bin/python

from multiprocessing import Pool, Manager

def spammer_task(d, my_list):
    #Initialize manager dict
    d['task'] = {
        'processed_list': []
    }

    for ele in my_list:
        #process here
        old_list = d['task']['processed_list']
        new_list = old_list.append(ele)
        #Have to do it this way since elements inside a manager dict become
        #immutable so
        d['task'] = {
            'processed_list': new_list
        }

    return

p = Pool()
m = Manager()
d = m.dict()

my_list = ["one", "two", "three"]

p.apply_async(spammer_task (d, my_list))
print d

Output:

Traceback (most recent call last): File "./a.py", line 29, in p.apply_async(spammer_task (d, my_list)) File "./a.py", line 14, in spammer_task new_list = old_list.append(ele) AttributeError: 'NoneType' object has no attribute 'append'

Somehow it seems to be appending None to the list which I cant figure out why.


Solution

  • Accoridng to solution at https://bugs.python.org/issue6766

    Following code fixes it, by copying whole task dict and then modifying it and recopying it

    #!/usr/bin/python
    
    from multiprocessing import Pool, Manager
    
    def spammer_task(d, my_list):
        #Initialize manager dict
        d['task'] = {
            'processed_list': []
        }
    
        for ele in my_list:
            #process here
            foo = d['task']
            foo['processed_list'].append(ele)
            d['task'] = foo
        return
    
    p = Pool()
    m = Manager()
    d = m.dict()
    
    my_list = ["one", "two", "three"]
    
    p.apply_async(spammer_task (d, my_list))
    print d
    

    Output:

    {'task': {'processed_list': ['one', 'two', 'three']}}