I am trying to use shared memory across multiple processes to update a dictionary that contains dictionaries which contain dictionaries... I tried using the Manager from the multiprocessing module, but I am having difficulty adding a dictionary to it. Please see the code and the comments below. Essentially this code is supposed to create a copy of the input in another dictionary called "output." Once I get this working, there will be logic to only copy certain "blades" from the input, but the node/cluster/blade hierarchy must be maintained.
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
def iterate_over_clusters_in_node(input, node, lock, output):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
print "node: " + node + ", node_IP: " + input[node]['IP'] + ", cluster: " + cluster + ", Blade: " + blade + ", cluster_IP: " + input[node]['clusters'][cluster]['blades'][blade]
with lock:
add_blade_to_output(input, node, cluster, blade, output)
def add_blade_to_output(input, node, cluster, blade, output):
''' Adds a blade to the managed output dictionary'''
if node not in output:
output[node] = {}
output[node]['IP'] = input[node]['IP']
output[node]['clusters'] = {}
# At this point, I would expect output[node]['IP'] and output[node]['clusters'] to exist
# But the following print raises KeyError: 'IP'
print output[node]['IP']
if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
output[node]['clusters'][cluster] = {}
output[node]['clusters'][cluster]['blades'] = {}
output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock, output))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
# Expectation: should print the same dictionary as the input
# Actual: prints "{'Node_2': {}, 'Node_1': {}}"
Do I need to be adding manager.dict() to output[node] instead of the built-in dictionary type? Or am I going about this all wrong?
Thanks!
Edit: I am not against switching this to a "threading" implementation instead of "multiprocessing." I'm new to running things in parallel, so if threading is better suited for this type of memory sharing, please let me know.
Edit: Here is the working code:
from multiprocessing import Process, Lock, Manager
# Define dictionary that will be used in this example
input = {
"Node_1": {
"IP": "127.0.0.1",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
},
"Node_2": {
"IP": "127.0.0.2",
"clusters": {
"cluster_1": {
"blades": {
"blade_0_1": "127.0.1.1",
"blade_0_2": "127.0.1.2"
}
},
"cluster_2": {
"blades": {
"blade_0_3": "127.0.1.3",
"blade_0_4": "127.0.1.4"
}
}
}
}
}
# Create dictionary to hold any failed blades so that appropriate measures can be taken
# Must use a Manager so that the dictionary can be shared among processes
manager = Manager()
output = manager.dict()
def iterate_over_clusters_in_node(input, node, lock):
""" Iterate over the clusters in the node, then over the blades in the cluster. Add each blade to the output dictionary."""
for cluster in input[node]['clusters']:
for blade in input[node]['clusters'][cluster]['blades']:
with lock:
add_blade_to_output(input, node, cluster, blade)
def add_blade_to_output(input, node, cluster, blade):
''' Adds a blade to the managed output dictionary'''
if node not in output:
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
new_node = {}
new_node.update(output)
if cluster not in output[node]['clusters']:
new_node[node]['clusters'][cluster] = {}
new_node[node]['clusters'][cluster]['blades'] = {blade: input[node]['clusters'][cluster]['blades'][blade]}
else:
new_node[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(new_node)
if __name__ == "__main__":
# Create lock to ensure correct handling of output from multiple processes
lock = Lock()
# Create array to track our processes
procs = []
# Iterate over all nodes in input
for node in input:
p = Process(target = iterate_over_clusters_in_node, args = (input, node, lock))
p.start()
procs.append(p)
# Join processes and wait for them to stop
for p in procs:
p.join()
print "The final output is:"
print output
According to the python docs,
Modifications to mutable values or items in dict and list proxies will not be propagated through the manager, because the proxy has no way of knowing when its values or items are modified. To modify such an item, you can re-assign the modified object to the container proxy.
With this information, we can update the manager accordingly:
#output[node] = {}
#output[node]['IP'] = input[node]['IP']
#output[node]['clusters'] = {} These changes are not propagated through the manager
new_node = {}
new_node[node] = {'IP': input[node]['IP'], 'clusters': {}}
output.update(new_node)
#if cluster not in output[node]['clusters']:
# Raises KeyError: 'clusters'
#output[node]['clusters'][cluster] = {}
#output[node]['clusters'][cluster]['blades'] = {}
#output[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
node_copy = output.copy()
if cluster not in node_copy[node]['clusters']:
node_copy[node]['clusters'].setdefault(cluster, {'blades': {}})
node_copy[node]['clusters'][cluster]['blades'][blade] = input[node]['clusters'][cluster]['blades'][blade]
output.update(node_copy)