scala - 使用 Spark Scala 计算训练数据集的均值、变异和标准差
问题描述
I have a dataframe :
+----------------+----------------+---------------------+---------------+--------------------+-----+-
|origin_longitude|dest_longitude |origin_latitude|destination_latitude|speed|Distance|
-7.1732833 | -7.1732833| 32.0414966| 32.0414966| 50| 20.0|
-7.1732833 | -7.1732833| 32.0414966| 32.0414966| 40| 2.50|
-7.1732833 | -7.1732833| 32.0414966| 32.0414966| 30| 3.0 |
-7.1732833 | -7.1732833| 32.0414966| 32.0414966| 10| 98.0|
-7.1732833 | -7.1732833| 32.0414966| 32.0414966| 10| 3.80|
我想在 DataFrame 的“距离”列上应用正常法则,为此我必须首先将数据集划分为训练数据和测试数据,然后我必须计算训练的平均值(平均值)和变化数据。所以要划分我做的数据是这样的:
val Array(trainingData, testData) = DF.randomSplit(Array(0.7 , 0.3), seed = 1234L)
要计算我这样做的平均值:
trainingData.toDF().agg(avg(col("Distance"))).show()
我收到此错误:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:720)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:173)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:211)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:208)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:169)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:313)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:405)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3482)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3472)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3468)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2581)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2788)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:297)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:334)
at org.apache.spark.sql.Dataset.show(Dataset.scala:816)
at org.apache.spark.sql.Dataset.show(Dataset.scala:775)
at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
at test$.main(test.scala:111)
at test.main(test.scala)
Caused by: java.io.NotSerializableException: scala.runtime.LazyRef
我必须像以前那样继续吗?您对我应该如何解决这个问题有任何想法吗? 谢谢 。
解决方案
我通过修改这样的代码解决了这个问题:
val splits =k.cache().randomSplit(Array(0.7, 0.3), seed = 11L)
val training = splits(0)
val test =splits(1)
推荐阅读
- android - 如何使用 Fused Location 订阅提供者状态更改事件?
- excel - 我只有在 Mac 上出现 VBA 错误 91,在 Windows 上没有
- php - 用户帐户 Xampp
- wxwidgets - 在 wxWidgets 中关闭框架的正确方法是什么
- java - 在 Android 上横向时字体大小不会更改
- plotly - 将图形大小调整为其中的指示器
- php - php特殊字符验证
- apache-kafka - 缺少融合平台连接器
- android - Retrofit 是否支持将 JSON 作为参数传递,但将其称为普通参数?
- mongodb - 如何在创建时在bitnami mongodb图表中插入数据?