首页 > 解决方案 > 是否可以使用 pyspark 来加快对非常大的数组的每一列的回归分析?

问题描述

我有一个非常大的数组。我想对数组的每一列进行线性回归。为了加快计算速度,我创建了一个列表,将数组的每一列作为其元素。然后我使用 pyspark 创建了一个 RDD,并在其上进一步应用了定义的函数。我在创建 RDD(即并行化)时遇到了内存问题。

我试图通过设置 spark-defaults.conf 将 spark.driver.memory 提高到 50g,但程序似乎仍然死机。

import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from pyspark import SparkContext
sc = SparkContext("local", "get Linear Coefficients")

def getLinearCoefficients(column):
    y=column[~np.isnan(column)] # Extract column non-nan values
    x=np.where(~np.isnan(column))[0]+1 # Extract corresponding indexs plus 1
    # We only do linear regression interpolation when there are no less than 3 data pairs exist.
    if y.shape[0]>=3:
        model=LinearRegression(fit_intercept=True) # Intilialize linear regression model
        model.fit(x[:,np.newaxis],y) # Fit the model using data
        n=y.shape[0]
        slope=model.coef_[0]
        intercept=model.intercept_
        r2=r2_score(y,model.predict(x[:,np.newaxis]))
        rmse=np.sqrt(mean_squared_error(y,model.predict(x[:,np.newaxis])))
    else:
        n,slope,intercept,r2,rmse=np.nan,np.nan,np.nan,np.nan,np.nan
    return n,slope,intercept,r2,rmse

random_array=np.random.rand(300,2000*2000) # Here we use a random array without missing data for testing purpose.
columns=[col for col in random_array.T]
columnsRDD=sc.parallelize(columns)
columnsLinearRDD=columnsRDD.map(getLinearCoefficients)
n=np.array([e[0] for e in columnsLinearRDD.collect()])
slope=np.array([e[1] for e in columnsLinearRDD.collect()])
intercept=np.array([e[2] for e in columnsLinearRDD.collect()])
r2=np.array([e[3] for e in columnsLinearRDD.collect()])
rmse=np.array([e[4] for e in columnsLinearRDD.collect()])

程序输出停滞不前,如下所示。

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError
        at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
        at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
        at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
        at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
        at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
        at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
        at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:486)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:467)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:315)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:412)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:409)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:409)
        at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:396)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396)
        at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86)
        at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

我想可以使用 pyspark 来加快计算速度,但我该怎么做呢?修改 spark-defaults.conf 中的其他参数?或者向量化数组的每一列(我知道 Python3 中的 range() 函数就是这样做的,它真的更快。)?

标签: numpypyspark

解决方案


那是行不通的。你基本上在做三件事:

  1. 您正在使用 RDD 进行并行化,
  2. 你正在调用你的 getLinearCoefficients() 函数,最后
  3. 您在其上调用collect()以使用您现有的代码。

第一点没有错,但是第二步和第三步就出现了巨大的错误。您的 getLinearCoefficients() 函数不会从 pyspark 中受益,因为您使用 numpy 和 sklearn(看看这篇文章为了更好的解释)。对于您使用的大多数功能,都有一个 pyspark 等价物。第三步的问题是 collect() 函数。当您调用 collect() 时,pyspark 会将 RDD 的所有行带到驱动程序并在那里执行 sklearn 函数。因此,您只能获得 sklearn 允许的并行化。以您目前的方式使用 pyspark 完全没有意义,甚至可能是一个缺点。Pyspark 不是一个允许您并行运行 python 代码的框架。当您想与 pyspark 并行执行代码时,您必须使用 pyspark 函数。

那你能做什么?

  • 首先,您可以使用 LinearRegression 类的n_jobs参数来使用多个核心进行计算。这使您至少可以使用一台机器的所有内核。
  • 您可以做的另一件事是离开 sklearn 并使用 pyspark 的线性回归(查看指南api)。有了这个,您可以使用整个集群进行线性回归。

推荐阅读