Search code examples
pythonnumpyrandomscipyprobability-distribution

Custom numpy (or scipy?) probability distribution for random number generation


The issue

Tl;dr: I would like a function that randomly returns a float (or optionally an ndarray of floats) in an interval following a probability distribution that resembles the sum of a "Gaussian" and a uniform distributions.

The function (or class) - let's say custom_distr() - should have as inputs (with default values already given):

  • the lower and upper bounds of the interval: low=0.0, high=1.0
  • the mean and standard deviation parameters of the "Gaussian": loc=0.5, scale=0.02
  • the size of the output: size=None
  • size can be an integer or a tuple of integers. If so, then loc and scale can either both simultaneously be scalars, or ndarrays whose shape corresponds to size.

The output is a scalar or an ndarray, depending on size.

The output has to be scaled to certify that the cumulative distribution is equal to 1 (I'm uncertain how to do this).

Note that I'm following numpy.random.Generator's naming convention from uniform and normal distributions as reference, but the nomenclature and the utilized packages does not really matter to me.

What I've tried

Since I couldn't find a way to "add" numpy.random.Generator's uniform and Gaussian distributions directly, I've tried using scipy.stats.rv_continuous subclassing, but I'm stuck at how to define the _rvs method, or the _ppf method to make it fast.

From what I've understood of rv_continuous class definition in Github, _rvs uses numpy's random.RandomState (which is out of date in comparison to random.Generator) to make the distributions. This seems to defeat the purpose of using scipy.stats.rv_continuous subclassing.

Another option would be to define _ppf, the percent-point function of my custom distribution, since according to rv_generic class definition in Github, the default function _rvs uses _ppf. But I'm having trouble defining this function by hand.

Following, there is a MWE, tested using low=0.0, high=1.0, loc=0.3 and scale=0.02. The names are different than the "The issue" section, because terminologies of terms are different between numpy and scipy.

import numpy as np
from scipy.stats import rv_continuous
import scipy.special as sc
import matplotlib.pyplot as plt
import time


# The class definition
class custom_distr(rv_continuous):
    def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0, *args, **kwargs):
        super(custom_distr, self).__init__(a, b, *args, **kwargs)
        self.a = a
        self.b = b
        self.my_loc = my_loc
        self.my_scale = my_scale

    def _pdf(self, x):
        # uniform distribution
        aux = 1/(self.b-self.a)
        # gaussian distribution
        aux += 1/np.sqrt(2*np.pi*self.my_scale**2) * \
                 np.exp(-(x-self.my_loc)**2/2/self.my_scale**2)
        return aux/2  # divide by 2?

    def _cdf(self, x):
        # uniform distribution
        aux = (x-self.a)/(self.b-self.a)
        # gaussian distribution
        aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
        return aux/2  # divide by 2?


# Testing the class
if __name__ == "__main__":
    my_cust_distr = custom_distr(name="my_dist", my_loc=0.3, my_scale=0.02)

    x = np.linspace(0.0, 1.0, 10000)

    start_t = time.time()
    the_pdf = my_cust_distr.pdf(x)
    print("PDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_pdf, label='pdf')

    start_t = time.time()
    the_cdf = my_cust_distr.cdf(x)
    print("CDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')

    # Get 10000 random values according to the custom distribution
    start_t = time.time()
    r = my_cust_distr.rvs(size=10000)
    print("RVS calc time: {:4.4f}".format(time.time()-start_t))

    plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=40)

    plt.ylim([0.0, the_pdf.max()])
    plt.grid(which='both')
    plt.legend()

    print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))

    plt.show()

The generated image is: enter image description here

The output is:

PDF calc time: 0.0010
CDF calc time: 0.0010
RVS calc time: 11.1120
Maximum of CDF is: 1.0

The time computing the RVS method is too slow in my approach.


Solution

  • According to Wikipedia, the ppf, or percent-point function (also called the Quantile function), can be written as the inverse function of the cumulative distribution function (cdf), when the cdf increases monotonically.

    From the figure shown in the question, the cdf of my custom distribution function does, indeed, increase monotonically - as is expected, since the cdf's of Gaussian and uniform distributions do so too.

    The ppf of the general normal distribution can be found in this Wikipedia page under "Quartile function". And the ppf of a uniform function defined between a and b can be calculated simply as p*(b-a)+a, where p is the desired probability.

    But the inverse function of the sum of two functions, cannot (in general) be trivially written as a function of the inverses! See this Mathematics Exchange post for more information.

    Therefore, the partial "solution" I have found thus far is to save an array containing the cdf of my custom distribution when instantiating an object and then finding the ppf (i.e. the inverse function of the cdf) via 1D interpolation, which only works as long as the cdf is indeed a monotonically increasing function.

    NOTE 1: I still haven't fixed the bound's check issue mentioned by Peter O.

    NOTE 2: The proposed solution is unviable if an ndarray of loc's were given, because of the lack of a closed-form expression for the Quartile function. Therefore, the original question is still open.

    The working code is now:

    import numpy as np
    from scipy.stats import rv_continuous
    import scipy.special as sc
    import matplotlib.pyplot as plt
    import time
    
    
    # The class definition
    class custom_distr(rv_continuous):
        def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0,
                     init_ppf=1000, *args, **kwargs):
            super(custom_distr, self).__init__(a, b, *args, **kwargs)
            self.a = a
            self.b = b
            self.my_loc = my_loc
            self.my_scale = my_scale
            self.x = np.linspace(a, b, init_ppf)
            self.cdf_arr = self._cdf(self.x)
    
        def _pdf(self, x):
            # uniform distribution
            aux = 1/(self.b-self.a)
            # gaussian distribution
            aux += 1/np.sqrt(2*np.pi)/self.my_scale * \
                     np.exp(-0.5*((x-self.my_loc)/self.my_scale)**2)
            return aux/2  # divide by 2?
    
        def _cdf(self, x):
            # uniform distribution
            aux = (x-self.a)/(self.b-self.a)
            # gaussian distribution
            aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
            return aux/2  # divide by 2?
    
        def _ppf(self, p):
            if np.any((p<0.0) | (p>1.0)):
                raise RuntimeError("Quantile function accepts only values between 0 and 1")
            return np.interp(p, self.cdf_arr, self.x)
    
    
    # Testing the class
    if __name__ == "__main__":
        a = 1.0
        b = 3.0
        my_loc = 1.5
        my_scale = 0.02
    
        my_cust_distr = custom_distr(name="my_dist", a=a, b=b,
                                     my_loc=my_loc, my_scale=my_scale)
    
        x = np.linspace(a, b, 10000)
    
        start_t = time.time()
        the_pdf = my_cust_distr.pdf(x)
        print("PDF calc time: {:4.4f}".format(time.time()-start_t))
        plt.plot(x, the_pdf, label='pdf')
    
        start_t = time.time()
        the_cdf = my_cust_distr.cdf(x)
        print("CDF calc time: {:4.4f}".format(time.time()-start_t))
        plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')
    
        start_t = time.time()
        r = my_cust_distr.rvs(size=10000)
        print("RVS calc time: {:4.4f}".format(time.time()-start_t))
    
        plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=100)
    
        plt.ylim([0.0, the_pdf.max()])
        # plt.xlim([a, b])
        plt.grid(which='both')
        plt.legend()
    
        print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))
    
        plt.show()
    

    The resulting image is: Generated image

    And the output is:

    PDF calc time: 0.0010
    CDF calc time: 0.0010
    RVS calc time: 0.0010
    Maximum of CDF is: 1.0
    

    The code is faster than before, at the cost of using a bit more memory.