Search code examples
pythonmpimpi4py

I can't figure out why the while loop in this MPI code doesn't break


I'm doing a parallelization exercise using mpi4py where 2 dice are thrown a defined number of times (divided by the processes, i.e, npp) and the dots are counted. The results are stored in a dictionary, the mean deviation is calculated and until the condition of mean_dev being less than 0.001 the number of throws is doubled.

All of this works as expected, the problem is that the code doesn't quit. The condition is met, there's no more outputs, but the code hangs.

from ctypes.wintypes import SIZE
from dice import * #This is just a class that creates the dictionaries 
from random import randint
import matplotlib.pyplot as plt
import numpy as np
from mpi4py import MPI
from math import sqrt

def simulation(f_events, f_sides, f_n_dice):
    f_X = dice(sides, n_dice).myDice() #Nested dictionary composed of dices (last dict stores the sum)
    for j in range(f_events): #for loop to handle all the dice throwings aka events
        n = [] #List to store index respective to number on each dice
        for i in range(1, f_n_dice+1): #for cycle for each dice
            k = randint(1, f_sides) #Random number
            n.append(k)
            f_X[i][k] += 1 #The index (k) related to each throw is increased for the dice (i)
        sum_throw = sum(n) #Sum of the last throw
        f_X[f_n_dice+1][sum_throw] += 1 #Sum dictionary "increases" the index respective to the sum of the last throw
    return f_X

npp = int(4)//4 #Number of events divided by the number of processes
sides = 6 #Number of sides per dice
n_dice = 2 #Number of dices

comm = MPI.COMM_WORLD #Communicator to handle point-to-point communication
rank = comm.Get_rank() #Hierarchy of processes
size = comm.Get_size() #Number of processes

#-------------------- Parallelization portion of the code --------------------#

seq = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
AUX = dict.fromkeys(seq, 0)
mean_dev = 1
while True:
    msg = comm.bcast(npp, root = 0)
    print("---> msg: ", msg, " for rank ", rank)
    print("The mean dev for %d" %rank + " is: ", mean_dev)

    D = simulation(npp, sides, n_dice)
    
    Dp = comm.gather(D, root = 0)
    print("This is Dp: ", Dp)
    
    summ = 0
    prob = [1/36, 2/36, 3/36, 4/36, 5/36, 6/36, 5/36, 4/36, 3/36, 2/36, 1/36]

    if rank==0:
        for p in range(0, size): 
                for n in range(dice().min, dice().max+1): #Range from minimum sum possible to the maximum sum possible depending on the number of dices used
                    AUX[n] += Dp[p][n_dice+1][n] #Adds the new data to the final sum dictionary 
                                                                #of the previously initiated nested dictionary
                print(Dp[p][n_dice+1])
    
        print("The final dictionary is: ", AUX, sum(AUX[j] for j in AUX))

        for i in range(dice().min, dice().max+1):
            exp = (prob[i-2])*(sum(AUX[j] for j in AUX))
            x = (AUX[i]-exp)/exp
            summ = summ + pow(x, 2)

        mean_dev = (1/11)*sqrt(summ)
        print("The deviation for {} is {}.".format(sum(AUX[j] for j in AUX), mean_dev))

    if mean_dev > 0.001:
        npp = 2*npp
        # new_msg = comm.bcast(npp, root = 0)
        # print("---> new_msg: ", new_msg, " for rank ", rank)
    else:
        break
        

I'm stumped on this one. Thanks in advance for any input!


The new code with the solution proposed by @victor-eijkhout:

from ctypes.wintypes import SIZE
from dice import *
from random import randint
import matplotlib.pyplot as plt
import numpy as np
from mpi4py import MPI
from math import sqrt

def simulation(f_events, f_sides, f_n_dice):
    f_X = dice(sides, n_dice).myDice() #Nested dictionary composed of dices (last dict stores the sum)
    for j in range(f_events): #for loop to handle all the dice throwings aka events
        n = [] #List to store index respective to number on each dice
        for i in range(1, f_n_dice+1): #for cycle for each dice
            k = randint(1, f_sides) #Random number
            n.append(k)
            f_X[i][k] += 1 #The index (k) related to each throw is increased for the dice (i)
        sum_throw = sum(n) #Sum of the last throw
        f_X[f_n_dice+1][sum_throw] += 1 #Sum dictionary "increases" the index respective to the sum of the last throw
    return f_X

npp = int(4)//4 #Number of events divided by the number of processes
sides = 6 #Number of sides per dice
n_dice = 2 #Number of dices

comm = MPI.COMM_WORLD #Communicator to handle point-to-point communication
rank = comm.Get_rank() #Hierarchy of processes
size = comm.Get_size() #Number of processes

#-------------------- Parallelization portion of the code --------------------#

seq = (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
AUX = dict.fromkeys(seq, 0)
mean_dev = 1
while True:
    msg = comm.bcast(npp, root = 0)
    #print("---> msg: ", msg, " for rank ", rank)
    
    D = simulation(npp, sides, n_dice)
        
    Dp = comm.gather(D, root = 0)
    #if Dp != None: print("This is Dp: ", Dp)

    
    #print("The mean dev for %d" %rank + " is: ", mean_dev)

    if rank==0:
        
        summ = 0
        prob = [1/36, 2/36, 3/36, 4/36, 5/36, 6/36, 5/36, 4/36, 3/36, 2/36, 1/36]

        for p in range(0, size): 
                for n in range(dice().min, dice().max+1): #Range from minimum sum possible to the maximum sum possible depending on the number of dices used
                    AUX[n] += Dp[p][n_dice+1][n] #Adds the new data to the final sum dictionary 
                                                                #of the previously initiated nested dictionary
                print(Dp[p][n_dice+1])
    
        print("The final dictionary is: ", AUX, sum(AUX[j] for j in AUX))

        for i in range(dice().min, dice().max+1):
            exp = (prob[i-2])*(sum(AUX[j] for j in AUX))
            x = (AUX[i]-exp)/exp
            summ = summ + pow(x, 2)

        mean_dev = (1/11)*sqrt(summ)
        print("The deviation for {} is {}.".format(sum(AUX[j] for j in AUX), mean_dev))

    #new_mean_dev = comm.gather(mean_dev, root = 0)
    new_mean_dev = comm.bcast(mean_dev, root = 0)
    print("---> msg2: ", new_mean_dev, " for rank ", rank)

    if new_mean_dev < 0.001:
        break
        # new_msg = comm.bcast(npp, root = 0)
        # print("---> new_msg: ", new_msg, " for rank ", rank)
        
    else:
        npp = 2*npp
        print("The new npp is: ", npp)

Git repo with other parallelization problems and solutions: https://github.com/davidmcarreira/parallel-computing


Solution

  • You are computing the mean deviation only on process zero, so that process will exit. However, the other processes do not get the value and so they never quit. You should broadcast the value after you compute it.