I want to build a fully connected graph parallelly in python and also get a list of edge values like :( node1, node2 ) = edge_value
stored in a dictionary format :
{ ( node1, node2 ) : edge_value [, ... [, ... ] ] }
To do this I have first to initialize two global
variables, G
for graph and f_correlation
for the said dictionary
import networkx as nx
from multiprocessing import Pool
G = nx.Graph()
f_correlation = {}
A function is then created to construct the graph and also stores
the ( node1, node2 ) = edge_value
into the f_correlation
dictionary :
def construct_graph_parallelly(pair_with_df):
global G
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[ (i, j) ] = calculate_value(df, i, j) # this function calculate some value on the dataframe
# here i, j are node in the graph
G.add_edge(i, j, weight = f_correlation[ (i, j) ])
return f_correlation
Then a multiprocessing.Pool()
-instance gets created and a call its .map()
-method is run, to let the code execute concurrently :
def make_all_pair_with_df(node_list, df):
all_pair_with_df = []
for i in node_list:
for j in node_list:
if i != j :
pair_with_df = (i,j),df
all_pair_with_df.append(pair_with_df)
return all_pair_with_df
node_list = ['a', 'b', 'c', 'd', 'e']
pool = Pool()
all_pair_with_df = make_all_pair_with_df(node_list, df)
f_correlation = pool.map(construct_graph_parallelly, all_pair_with_df)
pool.close()
print("DONE")
But when I run the code it runs infinitely, never prints "DONE"
One of the problem may be the global
-variable problem, discussed in Globals variables and Python multiprocessing
But for my work, I need to update the dictionary and the Connected Graph globally.
How can I do this or what modifications should I do to make this work?
Update: Let's be a bit less ambitious and use multiprocessing just to build the f_correlation
dictionary.
With your current code each process has its own copy of the global variables. You should use sharable, managed types (see multiprocessing.SyncManager
). For example:
from multiprocessing import Pool, Manager
# initialize this process's global variables:
def pool_initializer(the_dict):
# initialize global variable with shared, managed dictionary
global f_correlation
f_correlation = the_dict
def construct_graph_parallelly(pair_with_df):
global f_correlation
pair, df = pair_with_df
i, j = pair
# calculate the edge value and store it in the global variable f_correlation
f_correlation[(i, j)] = calculate_value(df, i, j) # this function calculate some value on the dataframe
def main():
with Manager() as manager: # create SyncManager instance
f_correlation = manager.dict() # create managed, shared dictionary
# code to initialize G omitted
with Pool(initializer=pool_initializer, initargs=(f_correlation,)) as pool:
all_pair_with_df = make_all_pair_with_df(node_list, df)
pool.map(construct_graph_parallelly, all_pair_with_df)
# now build graph
G = nx.Graph()
for k, v in f_correlation.items():
i, j = k # unpack
G.add_edge(i, j, weight=v)
if __name__ == '__main__': # required for Windows
main()