Search code examples
pythongraphnetworkxglobalconcurrent-processing

Can I create a "global", fully connected graph in python, being concurrently updated from concurrent workers in a Pool.map() processing?


I want to build a fully connected graph parallelly in python and also get a list of edge values like :
( node1, node2 ) = edge_value
stored in a dictionary format :
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }

To do this I have first to initialize two global variables, G for graph and f_correlation for the said dictionary

import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}

A function is then created to construct the graph and also stores
the ( node1, node2 ) = edge_value into the f_correlation dictionary :

def construct_graph_parallelly(pair_with_df):
    global G
    global f_correlation
    pair, df = pair_with_df
    i, j = pair
    # calculate the edge value and store it in the global variable f_correlation
    f_correlation[ (i, j) ] = calculate_value(df, i, j)    # this function calculate some value on the dataframe
    # here i, j are node in the graph 
    G.add_edge(i, j, weight = f_correlation[ (i, j) ])
    return f_correlation

Then a multiprocessing.Pool()-instance gets created and a call its .map()-method is run, to let the code execute concurrently :

def make_all_pair_with_df(node_list, df):
    all_pair_with_df = []
    for i in node_list:
        for j in node_list:
            if i != j :
                pair_with_df = (i,j),df
                all_pair_with_df.append(pair_with_df)

    return all_pair_with_df

node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df) 
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")

But when I run the code it runs infinitely, never prints "DONE"

One of the problem may be the global-variable problem, discussed in Globals variables and Python multiprocessing

But for my work, I need to update the dictionary and the Connected Graph globally.

How can I do this or what modifications should I do to make this work?


Solution

  • Update: Let's be a bit less ambitious and use multiprocessing just to build the f_correlation dictionary.

    With your current code each process has its own copy of the global variables. You should use sharable, managed types (see multiprocessing.SyncManager). For example:

    from multiprocessing import Pool, Manager
    
    # initialize this process's global variables:
    def pool_initializer(the_dict):
        # initialize global variable with shared, managed dictionary
        global f_correlation
        f_correlation = the_dict
    
    def construct_graph_parallelly(pair_with_df):
        global f_correlation
        pair, df = pair_with_df
        i, j = pair
        # calculate the edge value and store it in the global variable f_correlation
        f_correlation[(i, j)] = calculate_value(df, i, j)    # this function calculate some value on the dataframe
    
    def main():    
        with Manager() as manager: # create SyncManager instance
            f_correlation = manager.dict() # create managed, shared dictionary
            # code to initialize G omitted
            with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
                all_pair_with_df = make_all_pair_with_df(node_list, df) 
                pool.map(construct_graph_parallelly, all_pair_with_df)
                # now build graph
                G = nx.Graph()
                for k, v in f_correlation.items():
                   i, j = k # unpack
                   G.add_edge(i, j, weight=v)
        
    if __name__ == '__main__': # required for Windows
        main()