首页 > 技术文章 > MapReduce计算线性回归的系数估计值

littlefatsheep 2017-12-13 14:16 原文

1. 先修知识

设多元线性回归方程的模型为

\[Y=\beta_0+\beta_1X_1+\beta_2X_2+\cdots+\beta_pX_p \]

可令\(X_0=1\),则模型可写做:

\[Y=\beta_0X_0+\beta_1X_1+\beta_2X_2+\cdots+\beta_pX_p \]

表示成矩阵形式为:

\[Y=\beta X \]

其中,

\[\beta = \left[\begin{matrix} \beta_0 \\ \beta_1 \\ \beta_2 \\ \vdots \\ \beta_p \end{matrix} \right] X= \left[\begin{matrix} &x_{10} &x_{11} &\cdots &x_{1p} \\ &x_{20} &x_{21} &\cdots &x_{2p} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{n0} &x_{n1} &\cdots &x_{np} \end{matrix}\right] \]

则线性回归方程的最小二乘系数估计值为:

\[\hat{\beta} = (X'X)^{-1}X'Y \]

2. 编程思路

利用MapReduce计算系数是,由于输入数据是一行一行进行读取的,因此在计算的时候,不可能直接利用矩阵乘法进行计算。这里,我们假设输入的数据格式为:

\[x_0\ x_1\ x_2\ \cdots x_p\ y \]

\[1\ x_1\ x_2\ \cdots x_p\ y \]

首先,考虑最简单的,计算\(X'Y\),即计算

\[\left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right] \times \left[\begin{matrix} y_1 \\ y_2 \\ y_3 \\ \vdots \\ y_n \end{matrix}\right]=\left[\begin{matrix} &x_{10}y_1+&x_{20}y_2+&x_{30}y_3+&\cdots+&x_{n0}y_n \\ &x_{11}y_1+&x_{21}y_2+&x_{31}y_3+&\cdots+&x_{n1}y_n \\ &x_{12}y_1+&x_{22}y_2+&x_{32}y_3+&\cdots+&x_{n2}y_n \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p}y_1+&x_{2p}y_2+&x_{3p}y_3+&\cdots+&x_{np}y_n \end{matrix}\right] \]

观察右侧矩阵可以发现,\(y_1\)总是与第一列相乘,\(y_2\)总是与第二列相乘,……,以此类推。而第一列实际上就是我们读取数据的第一行,第二列是读取数据的第二行……。根据这种规律,我们每读取一行,就让当前的\(y\)与所有的自变量相乘,然后把所有的结果累加求和,即为我们想要的结果。

上面已经解决了矩阵自变量矩阵的转置与因变量向量的乘积,即:

\[part1 = X'Y \]

那么,矩阵转置与矩阵的乘积\(X'X\)仍然可以仿照上述的方法进行。

\[X'X = \left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right] \times\left[\begin{matrix} &x_{10} &x_{11} &\cdots &x_{1p} \\ &x_{20} &x_{21} &\cdots &x_{2p} \\ &x_{30} &x_{31} &\cdots &x_{2p} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{n0} &x_{n1} &\cdots &x_{np} \end{matrix} \right] \]


\[=\left[\begin{matrix} &x_{10} &x_{20} &x_{30} &\cdots &x_{n0} \\ &x_{11} &x_{21} &x_{31} &\cdots &x_{n1} \\ &x_{12} &x_{22} &x_{32} &\cdots &x_{n2} \\ &\vdots &\vdots &\vdots &\ddots &\vdots \\ &x_{1p} &x_{2p} &x_{3p} &\cdots &x_{np} \end{matrix}\right]\times\left[\begin{matrix} x_0, x_1, x_2, \cdots, x_p \end{matrix}\right] \]

可以把上述既然课程看作是进行了\(p+1\)次的矩阵转置与向量乘积过程。当读取第一行时,我们可以得到一个矩阵:

\[step1 = \left[\begin{matrix} &x_{10}x_{10} &x_{11}x_{10} &\cdots &x_{1p}x_{10} \\ &x_{10}x_{11} &x_{11}x_{11} &\cdots &x_{1p}x_{11} \\ &x_{10}x_{12} &x_{11}x_{12} &\cdots &x_{1p}x_{12} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{10}x_{1p} &x_{11}x_{1p} &\cdots &x_{1p}x_{1p} \end{matrix}\right]\]

同理,当读取第二行的时候,同样可以得到一个矩阵:

\[step2 = \left[ \begin{matrix} &x_{20}x_{20} &x_{21}x_{20} &\cdots &x_{2p}x_{20} \\ &x_{20}x_{21} &x_{21}x_{21} &\cdots &x_{2p}x_{21} \\ &x_{20}x_{22} &x_{21}x_{12} &\cdots &x_{2p}x_{12} \\ &\vdots &\vdots &\ddots &\vdots \\ &x_{20}x_{2p} &x_{21}x_{2p} &\cdots &x_{2p}x_{2p} \end{matrix}\right]\]

此时,将\(step1\ step2\)对应元素相加,即得到我们的更新矩阵。迭代下去,最终读取完所在的数据,即为\(X'X\)的结果。

当数据量很大的时候,用MapReduce方法进行计算,hadoop会将数据按照block的大小切分成若干块,每一块都执行mapper函数,在reducer里面把所有的mapper结果加起来,最后计算\((X'X)^{-1}X'Y\).

3. 编程实现

由于矩阵的输出不容易处理,因此在得到矩阵的时候,可以将其拉长为向量,这样就可以使用标准化输出函数print将计算结果输出,以便于reducer使用。

mapper函数如下:

#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-

import sys
import numpy as np

def read_input(file):
    for line in file:
        yield line.strip()

def matmulti():
    
    input = read_input(sys.stdin)

    innerLength = 0
    length = 0
    for line in input:
        fields = line.split(",")
        if innerLength == 0:
            innerLength = len(fields) - 1
            data1 = np.array([0.0 for _ in range(innerLength)])
        temp = np.array(fields,float)[:innerLength]*float(fields[innerLength])
        data1 = data1 + temp
        
        if length == 0:
            length = len(fields) - 1
            data2 = np.diag(np.zeros(length))
        for index in range(length):
            data2[index] += np.array(fields[:length],dtype = float)*float(fields[index])
            
    return data1,data2

data1,data2 = matmulti()
data1 = list(data1)
length = len(data2)
data2 = data2.reshape(1,length**2)
data2 = list(data2[0])
data = data1 + data2
print("\t".join(str(i) for i in data))

reducer函数如下:

#! /usr/bin/anaconda2/bin/python
# -*- coding:UTF-8 -*-

import sys
import numpy as np
import math

def read_input(file):
    for line in file:
        yield line.strip()

input = read_input(sys.stdin)

length = 0

for line in input:
    fields = line.split("\t")
    if length == 0:
        length = len(fields)
        data = np.array([0.0 for _ in range(length)])
    fields = np.array(fields,dtype = float)
    data += fields

lenght = len(data)
varnums = int((-1+math.sqrt(1+4.0*length))/2.0)

part1 = np.mat(data[:varnums])
part1 = part1.T

part2 = data[varnums:]
part2 = part2.reshape(varnums,varnums)
part2 = np.mat(part2)
part2 = part2.I

result = part2*part1

print result

用R随机生成一份样本量为100万的数据,R回归结果为:

Coefficients:
              Estimate Std. Error t value Pr(>|t|)    
(Intercept) 19.9995945  0.0602316 332.045   <2e-16 ***
V2          -0.0012168  0.0009998  -1.217   0.2236    
V3          -0.0018043  0.0009990  -1.806   0.0709 .  
V4           0.0017635  0.0009990   1.765   0.0775 .  
V5          -0.0021161  0.0009987  -2.119   0.0341 *  
V6          -0.0002982  0.0009997  -0.298   0.7655    
V7           0.0008608  0.0009998   0.861   0.3892    
V8           0.0007678  0.0009995   0.768   0.4423    
V9           0.0009089  0.0009991   0.910   0.3630    
V10          0.0009761  0.0009991   0.977   0.3286 

MapReduce的输出结果为:

[[  1.99995945e+01]	
 [ -1.21684122e-03]	
 [ -1.80428969e-03]	
 [  1.76352620e-03]	
 [ -2.11610460e-03]	
 [ -2.98208998e-04]	
 [  8.60846240e-04]	
 [  7.67848910e-04]	
 [  9.08866936e-04]	
 [  9.76052909e-04]]

二者的计算结果是一样的。

推荐阅读