numpy - 是否可以使用 pyspark 来加快对非常大的数组的每一列的回归分析?
问题描述
我有一个非常大的数组。我想对数组的每一列进行线性回归。为了加快计算速度,我创建了一个列表,将数组的每一列作为其元素。然后我使用 pyspark 创建了一个 RDD,并在其上进一步应用了定义的函数。我在创建 RDD(即并行化)时遇到了内存问题。
我试图通过设置 spark-defaults.conf 将 spark.driver.memory 提高到 50g,但程序似乎仍然死机。
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error
from pyspark import SparkContext
sc = SparkContext("local", "get Linear Coefficients")
def getLinearCoefficients(column):
y=column[~np.isnan(column)] # Extract column non-nan values
x=np.where(~np.isnan(column))[0]+1 # Extract corresponding indexs plus 1
# We only do linear regression interpolation when there are no less than 3 data pairs exist.
if y.shape[0]>=3:
model=LinearRegression(fit_intercept=True) # Intilialize linear regression model
model.fit(x[:,np.newaxis],y) # Fit the model using data
n=y.shape[0]
slope=model.coef_[0]
intercept=model.intercept_
r2=r2_score(y,model.predict(x[:,np.newaxis]))
rmse=np.sqrt(mean_squared_error(y,model.predict(x[:,np.newaxis])))
else:
n,slope,intercept,r2,rmse=np.nan,np.nan,np.nan,np.nan,np.nan
return n,slope,intercept,r2,rmse
random_array=np.random.rand(300,2000*2000) # Here we use a random array without missing data for testing purpose.
columns=[col for col in random_array.T]
columnsRDD=sc.parallelize(columns)
columnsLinearRDD=columnsRDD.map(getLinearCoefficients)
n=np.array([e[0] for e in columnsLinearRDD.collect()])
slope=np.array([e[1] for e in columnsLinearRDD.collect()])
intercept=np.array([e[2] for e in columnsLinearRDD.collect()])
r2=np.array([e[3] for e in columnsLinearRDD.collect()])
rmse=np.array([e[4] for e in columnsLinearRDD.collect()])
程序输出停滞不前,如下所示。
Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError
at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123)
at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:117)
at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:153)
at org.apache.spark.util.ByteBufferOutputStream.write(ByteBufferOutputStream.scala:41)
at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1877)
at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(ObjectOutputStream.java:1786)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1189)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:43)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:486)
at org.apache.spark.scheduler.TaskSetManager$$anonfun$resourceOffer$1.apply(TaskSetManager.scala:467)
at scala.Option.map(Option.scala:146)
at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:467)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:315)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:310)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:412)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4$$anonfun$apply$11.apply(TaskSchedulerImpl.scala:409)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:409)
at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$4.apply(TaskSchedulerImpl.scala:396)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:396)
at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalSchedulerBackend.scala:86)
at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalSchedulerBackend.scala:64)
at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
我想可以使用 pyspark 来加快计算速度,但我该怎么做呢?修改 spark-defaults.conf 中的其他参数?或者向量化数组的每一列(我知道 Python3 中的 range() 函数就是这样做的,它真的更快。)?
解决方案
那是行不通的。你基本上在做三件事:
- 您正在使用 RDD 进行并行化,
- 你正在调用你的 getLinearCoefficients() 函数,最后
- 您在其上调用collect()以使用您现有的代码。
第一点没有错,但是第二步和第三步就出现了巨大的错误。您的 getLinearCoefficients() 函数不会从 pyspark 中受益,因为您使用 numpy 和 sklearn(看看这篇文章为了更好的解释)。对于您使用的大多数功能,都有一个 pyspark 等价物。第三步的问题是 collect() 函数。当您调用 collect() 时,pyspark 会将 RDD 的所有行带到驱动程序并在那里执行 sklearn 函数。因此,您只能获得 sklearn 允许的并行化。以您目前的方式使用 pyspark 完全没有意义,甚至可能是一个缺点。Pyspark 不是一个允许您并行运行 python 代码的框架。当您想与 pyspark 并行执行代码时,您必须使用 pyspark 函数。
那你能做什么?
推荐阅读
- c# - C# Textbox.Text 转换为 SQL SmallMoney
- visual-studio - Visual Studio 在调试期间缓存旧版本的程序集
- html - 在图像 html 上创建文本
- visual-studio - 尝试在 SharePoint 中访问 quicklinks.aspx 时收到拒绝访问错误
- amazon-web-services - 安装 SSl 后 AWS EC2 操作超时
- python - Flask-Mysqldb NameError:未定义名称'_mysql'
- java - 更改 Spring 日志记录格式
- javascript - 无法读取未定义承诺错误的属性“解析器”
- php - 为什么我的路线在 Laravel 8 中不能完美运行?
- json - 找不到将 json 发送到后端的 Angular 错误 404