首页 > 解决方案 > 用于随机数生成的自定义 numpy(或 scipy?)概率分布

问题描述

问题

Tl; dr:我想要一个函数,它在类似于“高斯”和均匀分布之和的概率分布之后的间隔中随机返回一个浮点数(或可选的浮点数数组)。

函数(或类) - 比方说custom_distr()- 应该作为输入(已经给出默认值):

输出是标量或 ndarray,具体取决于大小。

必须对输出进行缩放以证明累积分布等于 1(我不确定如何执行此操作)。

请注意,我遵循numpy.random.Generator的命名约定uniformnormal发行版作为参考,但命名和使用的包对我来说并不重要。

我试过的

由于我找不到直接“添加”numpy.random.Generator的均匀分布和高斯分布的方法,因此我尝试使用scipy.stats.rv_continuous子类化,但我被困在如何定义_rvs方法或_ppf使其快速的方法上。

根据我对Github 中 rv_continuous 类定义的理解,_rvs使用numpy's random.RandomState(与 相比已过时random.Generator)进行分发。这似乎违背了使用scipy.stats.rv_continuous子类化的目的。

另一种选择是定义_ppf我的自定义分布的百分比函数,因为根据Github 中的 rv_generic 类定义,默认函数_rvs使用_ppf. 但是我在手动定义这个函数时遇到了麻烦。

接下来是一个 MWE,使用low=0.0high=1.0loc=0.3进行测试scale=0.02。名称与“问题numpy”部分不同,因为和之间的术语术语不同scipy

import numpy as np
from scipy.stats import rv_continuous
import scipy.special as sc
import matplotlib.pyplot as plt
import time


# The class definition
class custom_distr(rv_continuous):
    def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0, *args, **kwargs):
        super(custom_distr, self).__init__(a, b, *args, **kwargs)
        self.a = a
        self.b = b
        self.my_loc = my_loc
        self.my_scale = my_scale

    def _pdf(self, x):
        # uniform distribution
        aux = 1/(self.b-self.a)
        # gaussian distribution
        aux += 1/np.sqrt(2*np.pi*self.my_scale**2) * \
                 np.exp(-(x-self.my_loc)**2/2/self.my_scale**2)
        return aux/2  # divide by 2?

    def _cdf(self, x):
        # uniform distribution
        aux = (x-self.a)/(self.b-self.a)
        # gaussian distribution
        aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
        return aux/2  # divide by 2?


# Testing the class
if __name__ == "__main__":
    my_cust_distr = custom_distr(name="my_dist", my_loc=0.3, my_scale=0.02)

    x = np.linspace(0.0, 1.0, 10000)

    start_t = time.time()
    the_pdf = my_cust_distr.pdf(x)
    print("PDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_pdf, label='pdf')

    start_t = time.time()
    the_cdf = my_cust_distr.cdf(x)
    print("CDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')

    # Get 10000 random values according to the custom distribution
    start_t = time.time()
    r = my_cust_distr.rvs(size=10000)
    print("RVS calc time: {:4.4f}".format(time.time()-start_t))

    plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=40)

    plt.ylim([0.0, the_pdf.max()])
    plt.grid(which='both')
    plt.legend()

    print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))

    plt.show()

生成的图像是: 在此处输入图像描述

输出是:

PDF calc time: 0.0010
CDF calc time: 0.0010
RVS calc time: 11.1120
Maximum of CDF is: 1.0

在我的方法中,计算 RVS 方法的时间太慢了。

标签: pythonnumpyrandomscipyprobability-distribution

解决方案


根据 Wikipedia,当 cdf 单调增加时,ppf 或百分比点函数(也称为分位数函数)可以写为累积分布函数 (cdf) 的反函数。

从问题中显示的图中,我的自定义分布函数的 cdf 确实单调增加 - 正如预期的那样,因为高斯分布和均匀分布的 cdf 也是如此。

一般正态分布的ppf可以在这个维基百科页面的“四分位函数”下找到。a并且定义在和之间的均匀函数的 ppfb可以简单地计算为p*(b-a)+a,其中p是所需的概率。

但是两个函数之和的反函数(通常)不能简单地写成逆函数!有关更多信息,请参阅此数学交流帖子

因此,到目前为止,我发现的部分“解决方案”是在实例化一个对象时保存一个包含我的自定义分布的 cdf 的数组,然后通过一维插值找到 ppf(即 cdf 的反函数),它只能作为只要 cdf 确实是一个单调递增的函数。

注意 1:我还没有解决 Peter O. 提到的边界检查问题。

注 2:如果给出 's 的 ndarray,建议的解决方案是不可行的loc,因为四分位函数缺少封闭形式的表达式。因此,最初的问题仍然悬而未决。

现在的工作代码是:

import numpy as np
from scipy.stats import rv_continuous
import scipy.special as sc
import matplotlib.pyplot as plt
import time


# The class definition
class custom_distr(rv_continuous):
    def __init__(self, my_loc=0.5, my_scale=0.5, a=0.0, b=1.0,
                 init_ppf=1000, *args, **kwargs):
        super(custom_distr, self).__init__(a, b, *args, **kwargs)
        self.a = a
        self.b = b
        self.my_loc = my_loc
        self.my_scale = my_scale
        self.x = np.linspace(a, b, init_ppf)
        self.cdf_arr = self._cdf(self.x)

    def _pdf(self, x):
        # uniform distribution
        aux = 1/(self.b-self.a)
        # gaussian distribution
        aux += 1/np.sqrt(2*np.pi)/self.my_scale * \
                 np.exp(-0.5*((x-self.my_loc)/self.my_scale)**2)
        return aux/2  # divide by 2?

    def _cdf(self, x):
        # uniform distribution
        aux = (x-self.a)/(self.b-self.a)
        # gaussian distribution
        aux += 0.5*(1+sc.erf((x-self.my_loc)/(self.my_scale*np.sqrt(2))))
        return aux/2  # divide by 2?

    def _ppf(self, p):
        if np.any((p<0.0) | (p>1.0)):
            raise RuntimeError("Quantile function accepts only values between 0 and 1")
        return np.interp(p, self.cdf_arr, self.x)


# Testing the class
if __name__ == "__main__":
    a = 1.0
    b = 3.0
    my_loc = 1.5
    my_scale = 0.02

    my_cust_distr = custom_distr(name="my_dist", a=a, b=b,
                                 my_loc=my_loc, my_scale=my_scale)

    x = np.linspace(a, b, 10000)

    start_t = time.time()
    the_pdf = my_cust_distr.pdf(x)
    print("PDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_pdf, label='pdf')

    start_t = time.time()
    the_cdf = my_cust_distr.cdf(x)
    print("CDF calc time: {:4.4f}".format(time.time()-start_t))
    plt.plot(x, the_cdf, 'r', alpha=0.8, label='cdf')

    start_t = time.time()
    r = my_cust_distr.rvs(size=10000)
    print("RVS calc time: {:4.4f}".format(time.time()-start_t))

    plt.hist(r, density=True, histtype='stepfilled', alpha=0.3, bins=100)

    plt.ylim([0.0, the_pdf.max()])
    # plt.xlim([a, b])
    plt.grid(which='both')
    plt.legend()

    print("Maximum of CDF is: {:2.1f}".format(the_cdf[-1]))

    plt.show()

生成的图像是: 生成的图像

输出是:

PDF calc time: 0.0010
CDF calc time: 0.0010
RVS calc time: 0.0010
Maximum of CDF is: 1.0

代码比以前更快,但代价是使用了更多内存。


推荐阅读