Search code examples
pythonpython-3.xmultiprocessingpython-multiprocessingshared-memory

Mutate a shared object in python Multiprocessing


Assuming that there is a dummy.txt file that holds the following information:

a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10

(Numerical values are only 10,20, and 30) and the following code:

import multiprocessing 

global conns 

class Line():
    def __init__(self, text, group) -> None:
        self.text = text 
        self.group = int(group)

    def get_link(self):
        return self.text.split('_from')[0]

    def __repr__(self):
        return f"<{self.group},{self.text}>"

class Groups():

    def __init__(self, name ) -> None: 
        self.name = name 
        self.groups = {k:set() for k in [10,20,30]}

    def add_to_dict(self,line : Line):
        
        connection = line.get_link()
        if connection not in self.groups.keys():
            self.groups[connection] = set()
        
        self.groups[connection].add(line.text)

def thread_f(item : Line):

    # Update the dictionary of every Group object accordingly
    global conns
 
    key = item.get_link()
     
    conns[key].add_to_dict(item)
    
def main():

    global conns 

    # Parse the file and store the information in an iterable
    with open('dummy.txt') as f:

        info = [ Line(*line.strip().split()) for line in f]

    # Update the global (shared) object and initialize a dictionary 
    # this has the following initialization: 
    # { a_to_b : set(), c_to_f : set() }
    conns = { k : Groups(k) for k in {x.get_link() for x in info} }

    # Update the shared object according to the iterable information
    with multiprocessing.Pool(5) as pool:

        res = pool.map(thread_f,     # add to the appropriate key the items 
                        info,        # the lines 
                        chunksize=1) # respect the order

    # Display the Results        
    for group_name, group_obj in conns.items():

        print(f"Grp Name {group_name} has the following:")

        for cost, connections in group_obj.groups.items():

            print(cost,connections)
        

if __name__ == "__main__":
    main()

What I am trying to do is to first parse the file and for every line of the file to generate a Line object. After the parsing is done I update the global variable conns that I intend to use as a shared variable for all the workers of the pool. Then in the thread_f function I am updating the global variable (dictionary) by adding to the appropriate Group object's dictionary field the respective Line.

The problem is that when I try to display the information, nothing gets displayed. Instead, I get a collection of empty sets:

Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()

Instead, I was expecting the following:

Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)

Since, the python multiprocessing is practically a fork approach I do understand that the child processes do have access to the parental already initialized information but their changes have no effect back to the parent. After reading the docs and searching in S.O. I found about the Manager objects of the multiprocessing package. The thing is, that I am unable to generate a Manager.dict() that is already initialized (like I have done in my case with the conns comprehension).

How can I achive the aforementioned desired behavior ?

Yes but why Multiprocessing?

Well, this example is a mere MWE that I created to mimic what my actual code does. As a matter of fact, I am trying to speedup code that does not scale well for really large files of input.

Regarding the Manager

Driven by a slightly similar question here [1], I did not manage to find a way of initializing or a Manager.dict() from a pre-existing i.e., already initialized dictionary to pass into the spawned processes. Thus, I used sets which guarantee that there will be no duplicate entries and a global already initialized variable to be continuously updated by the processes.

A work-around for the proposed Manager approach ?

Well, since the computational effort increases with the usage of a shared resource which is the Manager.dict object, could a potential work-around be the following?

The idea is to:

  • Avoid the usage of shared resources amongst processes
  • Thus, avoid race conditions

So, assuming that we have our working pool of processes that consists of X processes in total. The task of each worker is to categorize each Line to the appropriate Group. The Lines are given to the workers as a list[Line] object. Thus, what if we use a somewhat divide-and-conquer approach like this:

+--------------------------------------------------------------+
|                        list[Line]                            |
+--------------------------------------------------------------+
|        |        |        |        |        |        |        |
|        |        |        |        |        |        |        |
  <-d->    <-d->    <-d->     ...                       <-d->

The list is divided into X independent chunks/slices/sublists of size len(list)/X. Then, each of these iterables is given to the workers for processing. But now the thread_f function has to be modified accordingly.

It shall:

  • Generate a dictionary of Groups of where key = line.group and value = Group object
  • Fill this dictionary according to the Line objects of his given chunk/slice/sublist.
  • Return the dictionary

After the pool of procs has finished, the results i.e., the dictionaries have to be merged into one that will have the final solution.


Solution

  • First, I believe you have an error in method Groups.add_to_dict. I have commented out the erroneously statement and added the correct statement after it:

    import multiprocessing
    
    def init_processes(d, the_lock):
        global conns, lock
        conns, lock = d, the_lock
    
    class Line():
        def __init__(self, text, group) -> None:
            self.text = text
            self.group = int(group)
    
        def get_link(self):
            return self.text.split('_from')[0]
    
        def __repr__(self):
            return f"<{self.group},{self.text}>"
    
    class Groups():
    
        def __init__(self, name ) -> None:
            self.name = name
            self.groups = {k:set() for k in [10,20,30]}
    
        def add_to_dict(self,line : Line):
    
            #connection = line.get_link()
            connection = line.group
            if connection not in self.groups.keys():
                self.groups[connection] = set()
    
            self.groups[connection].add(line.text)
    
    
    def thread_f(item : Line):
    
        # Update the dictionary of every Group object accordingly
        global conns # Not strictly necessary
    
        key = item.get_link()
    
        # We need to let the managed dict know there is an updated value for the key:
        """
        conns[key].add_to_dict(item)
        """
        with lock:
            the_set = conns[key]
            the_set.add_to_dict(item)
            conns[key] = the_set # reset the reference
    
    def main():
    
        # Parse the file and store the information in an iterable
        with open('dummy.txt') as f:
    
            info = [ Line(*line.strip().split()) for line in f]
    
        # Update the global (shared) object and initialize a dictionary
        # this has the following initialization:
        # { a_to_b : set(), c_to_f : set() }
        conns = multiprocessing.Manager().dict(
            { k : Groups(k) for k in {x.get_link() for x in info} }
        )
    
        # Update the shared object according to the iterable information
        lock = multiprocessing.Lock()
        with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:
    
            res = pool.map(thread_f,     # add to the appropriate key the items
                            info,        # the lines
                            chunksize=1) # respect the order
    
        # Display the Results
        for group_name, group_obj in conns.items():
    
            print(f"Grp Name {group_name} has the following:")
    
            for cost, connections in group_obj.groups.items():
    
                print(cost,connections)
    
    
    if __name__ == "__main__":
        main()
    

    Prints:

    Grp Name c_to_f has the following:
    10 {'c_to_f_from_l'}
    20 {'c_to_f_from_k'}
    30 {'c_to_f_from_e'}
    Grp Name a_to_b has the following:
    10 {'a_to_b_from_l'}
    20 {'a_to_b_from_k', 'a_to_b_from_c'}
    30 {'a_to_b_from_w'}
    

    Update

    I may be off base here, but it seems that most of the work I see is in the parsing of the input line. In your real case, whatever that might be, it might represent a negligible portion of your total processing (and if doesn't, then, as I have previously mentioned in a comment, multiprocessing is not appropriate for this problem), but I see no reason not to move that processing to the multiprocessing pool itself.

    I have greatly refactored the code, moving the line parsing to the the Line class and no longer seeing the need for the Groups class since the merging of dictionaries and sets is being done by the main process.

    I am using method imap_unordered rather than imap since it is generally slightly more efficient and your previous coding, which did not use the return values from the map method, did not depend on the order in which results were generated. And so keys could be added to the dictionary in arbitrary order. And why should order even matter in a dictionary to begin with?

    You should be aware that if your input file has very many lines and your worker function requires non-trivial processing, you can fill the multiprocessing task queue much faster than the processes can empty it and you will potentially exhaust memory. I do have a solution to that, but that is another story.

    import multiprocessing
    
    
    class ProcessedLine():
        def __init__(self, text : str) -> None:
            self.text, group = text.strip().split()
            self.group = int(group)
            self.link = text.split('_from')[0]
            self.dict = {self.link: {self.group: set([self.text])}}
    
    def process_line(text : str):
        processed_line = ProcessedLine(text)
        return processed_line
    
    def compute_chunksize(iterable_size, pool_size):
        chunksize, remainder = divmod(iterable_size, 4 * pool_size)
        if remainder:
            chunksize += 1
        return chunksize
    
    def main():
    
        def generate_lines():
            with open('dummy.txt') as f:
                for line in f:
                    yield line
    
        ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
        POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
        # chunksize to be used with imap_unordered:
        chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
        pool = multiprocessing.Pool(POOL_SIZE)
        # Specify a chunksize value if the size of the iterable is large
        results = {}
        for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
            link = processed_line.link
            if link not in results:
                # Just update with the entire dictionary
                results.update(processed_line.dict)
            else:
                # Update the set dictionary:
                set_dict = results[link]
                set_key = processed_line.group
                if set_key in set_dict:
                    set_dict[set_key].add(processed_line.text)
                else:
                    #set_dict[set_key] = set(processed_line.text)
                    set_dict[set_key] = processed_line.dict[link][set_key]
        pool.close()
        pool.join()
    
        for group_name, groups in results.items():
            print(f'Group Name {group_name} has the following:')
            for k, v in groups.items():
                print('   ', k, '->', v)
            print()
    
    if __name__ == "__main__":
        main()
    

    Prints:

    Group Name a_to_b has the following:
        20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
        30 -> {'a_to_b_from_w'}
        10 -> {'a_to_b_from_l'}
    
    Group Name c_to_f has the following:
        30 -> {'c_to_f_from_e'}
        20 -> {'c_to_f_from_k'}
        10 -> {'c_to_f_from_l'}