Search code examples
python-3.xnumpynormal-distributiontruncation

numpy vectorise value generation from a truncated gaussian distribution


I've a function which generates a value from a truncated normal distribution, with a while loop which ensures that any generated value that lies outside the truncation gets discarded and replaced with another generation until it lies within the range.

def gen_truncated(minimum, maximum, ave, sigma):
    # min=0.9, max=1, 
    x = 0.
    while x < minimum or x > maximum:
        x = np.random.normal(0,1)*sigma+ave

    return x

How can I vectorise this function in such a way that x is now an array of many x values, generated in such a way that there's always a while loop ensuring that the array elements are regenerated so long as the condition of x < minimum and x > maximum is reached? Is there a vectorise way of comparing every element of x to a number i.e. minimum or maximum?

Edit: What if I have more constraints that need to be satisfied? Ultimately I'm looking to vectorise the generation of a 4x4 matrix that's generated through several contraints, the constraint in gen_truncated() is just one of the many. I have a gen_sigma() which first generates 3 values lambda1, lambda2, lambda3, now lambda3 again needs to satisfy several conditions w.r.t the values of lambda1 and lambda2 otherwise they get redrawn. Once they're right, all three values gets fed into get_tau() to generate 3 values. Again these tau values need to satisfy some more constraints, otherwise they are thrown away and are generated again until they're right. Ultimately, they form a 4x4 matrix called sigma_gen, which are multiplied left and right by create_rotation() through gen_channel to create a single 4x4 matrix channel.

import numpy as np
from numpy.linalg import norm

def gen_sigma(minimum, maximum, ave, sigma):
    lambda1 = gen_truncated(minimum, maximum, ave, sigma)
    lambda2 = gen_truncated(minimum, maximum, ave, sigma)
    lambda3 = gen_truncated(minimum, maximum, ave, sigma)

    while 1+lambda3 < abs(lambda1+lambda2) or 1-lambda3 < abs(lambda2-lambda1):
        lambda3 = gen_truncated(minimum, maximum, ave, sigma)

    tau = get_tau(lambda1, lambda2, lambda3)
    lambdas = [lambda1, lambda2, lambda3]
    while (norm(tau)**2 >
           1-sum([x**2 for x in [lambda1, lambda2, lambda3]]) +
           2*lambda1*lambda2*lambda3) or (z_eta(tau, lambdas) < 0):
        tau = get_tau(lambda1, lambda2, lambda3)

    sigma_gen = np.array([[     1,       0, 0, 0],
                          [tau[0], lambda1, 0, 0],
                          [tau[1], 0, lambda2, 0],
                          [tau[2], 0, 0, lambda3]])

    return sigma_gen

def get_tau(einval1, einval2, einval3):
    max_tau1 = 1 - abs(einval1)
    max_tau2 = 1 - abs(einval2)
    max_tau3 = 1 - abs(einval3)
    tau1 = max_tau1*(2*np.random.uniform(0,1)-1)
    tau2 = max_tau2*(2*np.random.uniform(0,1)-1)
    tau3 = max_tau3*(2*np.random.uniform(0,1)-1)

    return [tau1, tau2, tau3]

def z_eta(t: np.ndarray, l: np.ndarray):
    condition = (norm(t)**4 - 2*norm(t)**2 -
                 2*sum([(l[i]**2)*(2*(t[i]**2-norm(t)**2)) for i in range(3)])+
                 q(l))
    return condition

def q(e: np.ndarray):
    # e are the eigenvalues
    return (1+e[0]+e[1]+e[2])*(1+e[0]-e[1]-e[2])*(1-e[0]+e[1]-e[2])*(1-e[0]-e[1]+e[2])

def create_rotation(angles: np.ndarray) -> np.ndarray:
    "random rotation in PL form"
    # input np.random.normal(0,1,3)*0.06
    rotation = np.eye(4, dtype=complex)
    left = np.array([[ np.cos(angles[0]), np.sin(angles[0]), 0],
                     [-np.sin(angles[0]), np.cos(angles[0]), 0],
                     [                 0,                 0, 1]])
    mid = np.array([[1,                 0,                 0],
                    [0, np.cos(angles[1]), np.sin(angles[1])],
                    [0, -np.sin(angles[1]), np.cos(angles[1])]])
    right = np.array([[ np.cos(angles[2]), np.sin(angles[2]), 0],
                      [-np.sin(angles[2]), np.cos(angles[2]), 0],
                      [                 0,                 0, 1]])
    rotation[1:4,1:4] = left@mid@right

    return rotation

def gen_channel(r1, r2, ave, sigma):
    rand1 = np.random.normal(0,1,3)
    rand2 = np.random.normal(0,1,3)
    channel = create_rotation(rand1*r1)@gen_sigma(0.9, 1, ave, sigma)@\
              create_rotation(rand2*r2)
    return channel

An example run of a channel

gen_channel(0.05, 0.05, 0.98, 0.15)

would give for example

Out[140]: 
array([[ 1.        +0.j,  0.        +0.j,  0.        +0.j,
         0.        +0.j],
       [-0.05828008+0.j,  0.91805971+0.j,  0.14291751+0.j,
        -0.00946994+0.j],
       [-0.00509449+0.j, -0.14170308+0.j,  0.90034613+0.j,
        -0.11548884+0.j],
       [ 0.0467522 +0.j, -0.00851749+0.j,  0.11450963+0.j,
         0.90259637+0.j]])

Now if I want to create say 100 of these 4x4 matrix I'll have to use list comprehension i.e.

np.array([gen_channel(0.05, 0.05, 0.98, 0.15) for i in range(100)])

which will run through all the constraints comparison and create the 4x4 matrices one by one. Now my very original question was motivated by the fact that I want to vectorise them, so rather than comparing one value at a time, just generate an array of values using numpy broadcast and check the constraints such that I have a vectorise version of gen_channel which generates 100 such 4x4 matrices without the need of list comprehension. The list comprehension way contains the repeated use of generating a single random number which leads to a bottleneck in its runspeed. What I want to do is just generate arrays of random numbers, do those checks, then generate array of 4x4 channels so as to reduce the bottleneck.


Solution

  • You could draw a large sample from the original distribution, then determine which entries lie in the correct range, and then draw from that:

    # parameters
    ave, sigma = 0,1
    minimum, maximum = 0.9, 1
    
    # draw sample and specify which entries are ok
    a = np.random.normal(ave, sigma, 100000)
    index = (a > minimum) & (a < maximum)
    
    # draw from subset
    np.random.choice(a[index], 1000, replace=False)
    

    Using timeit

    on the above code:

    %%timeit -r 10 -n 10 
    2.51 ms ± 87.5 µs per loop (mean ± std. dev. of 10 runs, 10 loops each)
    

    on the original in a loop:

    %%timeit -r 10 -n 10
    
    for i in range(1000):
        gen_truncated(0.9,1, 0, 1)
    
    88.5 ms ± 1.24 ms per loop (mean ± std. dev. of 10 runs, 10 loops each)