首页 > 解决方案 > pybind11 与 numpy 的矩阵乘积

问题描述

编辑 2(2018 年 6 月 18 日。)

我使用了Matrix建议的课程

http://pybind11.readthedocs.io/en/stable/advanced/pycpp/numpy.html

使用Matrix我实现的产品如下:

Matrix product3(const Matrix &s1, const Matrix &s2) // M = M1 x M2
{
    size_t rowsM1 = s1.rows();
    size_t colsM1 = s1.cols();
    size_t rowsM2 = s2.rows();
    size_t colsM2 = s2.cols();
    assert(colsM1 == rowsM2);
    size_t resDim = rowsM1 * colsM2;
    double * ptr = new double[resDim];
    cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasNoTrans, rowsM1, colsM2, colsM1, 1.0, s1.data(), rowsM1, s2.data(), colsM2, 0.0, ptr, std::max(rowsM1, colsM2));
    Matrix res(rowsM1, colsM2, ptr);
    return res;
}

在发布时(在核心 i7 6700 HQ 上)结果如下:

在此处输入图像描述

确实比py::array_t<double>'s好很多。图表(纵坐标为秒,横坐标为方阵大小): 在此处输入图像描述

在 intel mkl 下,numpy 相对来说是一点点。在大小区域 [1500,1600] 中两者都有显着下降,mkl 更陡峭。可以注意到,随着矩阵大小的增加,“numpy time / intel time”这个因素正在减少。

这次在核心 i7-7700K 上:

在此处输入图像描述

测试python代码是:

import Binder
import numpy as np
import matplotlib.pyplot as plt
import time

rangeMin = 100
rangeMax = 2000
step = 100

X = []
intel = []
numpy = []

for size in range(rangeMin, rangeMax, step):

    X.append(size)

    m1 = np.array(np.random.rand(size,size), copy = False).astype(np.float64)
    M1 = Binder.Matrix(m1)
    m2 = np.array(np.random.rand(size,size), copy = False).astype(np.float64)
    M2 = Binder.Matrix(m2)

    M = Binder.Matrix(size,size)
    N = np.array([size,size])

    #M.print()

    loopSize = 50

    start_time = time.time()
    for x in range(1, loopSize):
        N = m1 @ m2
    time_elapsed = (time.time() - start_time)/loopSize

    print("Size =\t" + repr(size) + "\tnumpy Time =\t" + repr(time_elapsed))
    numpy.append(time_elapsed)

    start_time = time.time()
    for x in range(1, loopSize):
        M = Binder.product3(M1,M2);
    time_elapsed = (time.time() - start_time)/loopSize

    print("Size =\t" + repr(size) + "\tintel Time =\t" + repr(time_elapsed))
    intel.append(time_elapsed)

fig = plt.figure()
ax1 = fig.add_subplot(111)

ax1.scatter(X, numpy, s=10, c='b', marker="s", label='numpy')
ax1.scatter(X, intel, s=10, c='r', marker="o", label='intel')
plt.legend(loc='upper left');
plt.show()

编辑 1(2018 年 6 月 16 日。)

我尝试了同样的方法,这次使用 intel mkl,将for初始代码的循环替换为

cblas_dgemm(CblasRowMajor, CblasNoTrans, CblasNoTrans, nbRows1, nbCols2, nbCols1, 1.0, ptr1, nbRows1, ptr2, nbCols2, 0.0, ptr, nbRows1);

初始代码在英特尔酷睿 i5 4570 上运行。这次在英特尔酷睿 i7 6700 HQ 上运行所有三个案例,结果如下:

在此处输入图像描述

两个备注

1) 对于相同的 Python 3.6.5 32 位,在我的笔记本电脑的核心 i7 上使用 numpy 的 Python 比在我在工作中使用的旧核心 i5 桌面上要慢。朴素的 c++ 稍微快一点。很奇怪。

2)在核心 i7 上,c++ intel mkl vs numpy 的因子是 3.41


最初的问题

我写了这个非常幼稚的 c++ pybind11 代码:

py::array product1(py::array_t<double> m1, py::array_t<double> m2)
{
    py::buffer_info info1 = m1.request();
    double * ptr1 = static_cast<double *>(info1.ptr);

    py::buffer_info info2 = m2.request();
    double * ptr2 = static_cast<double *>(info2.ptr);

    unsigned int nbRows1 = info1.shape[0];
    unsigned int nbCols1 = info1.shape[1];

    unsigned int nbRows2 = info2.shape[0];
    unsigned int nbCols2 = info2.shape[1];

    assert(nbCols1 == nbRows2);

    int resDim = nbRows1 * nbCols2;

    double * ptr = new double[resDim];

    double localSum = 0.0;
    for (int i = 0 ; i < nbRows1; ++i)
    {
        for (int j = 0 ; j < nbCols2; ++j)
        {
            for (int l = 0; l < nbCols1; ++l)
            {
                localSum += ptr1[nbCols1 * i + l] * ptr2[nbCols2 * l + j];
            }
            ptr[nbCols2 * i + j] = localSum;
            localSum = 0.0;
        }
    }
    py::array_t<double> mRes = py::array_t<double>
                                (
                                    py::buffer_info
                                    (
                                        ptr,
                                        sizeof(double), //itemsize
                                        py::format_descriptor<double>::format(),
                                        2, // ndim
                                        std::vector<size_t> { nbRows1, nbCols2 }, // shape
                                        std::vector<size_t> {nbRows1 * sizeof(double), sizeof(double)} // strides
                                    )
                                );
    delete[] ptr;
    return mRes;
}

我比较了执行两个固定 500*500 随机生成矩阵的乘积的平均(500 个产品)时间,得到以下结果:

python with numpy :    0.0067s
python with pybind11 : 0.7941s

那个 118 因素让我感到惊讶。当然,我没想到在第一次尝试时就击败了 numpy,但是两次平均时间之间的 100 倍让我感到惊讶。c++如果我将 intel mkl 用于产品的一部分或任何其他库,我认为该因素不会得到显着改善。

所以我猜这个因素主要是由numpy数组到py::array_t<double>s的“转换”和逆转换来解释的。

我知道 numpy 依赖于c代码(很快c++就是代码),但我真的很想知道这些转换是如何在 numpy 中完成的。我在 github 上浏览了 numpy 的源代码,但没有找到“编组”部分或 c 产品部分。

标签: pythonc++python-3.xnumpypybind11

解决方案


如果我将 intel mkl 用于产品的 c++ 部分,我认为该因素不会得到显着改善”

绝对会的。MKL 和类似库中的矩阵矩阵乘积 (GEMM) 是有史以来最高度优化的代码之一。它与您的临时循环完全不同。


推荐阅读