scala - 如何在 Scala 中总结这两个 Spark Dataframes?
问题描述
我正在熟悉 Spark 和 Scala,我目前的任务是“总结”这两个 Dataframe:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 15.8| 1.0200000000000014|
| 6| 20.9|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
+---+------------------+------------------+
|cyl| avg(mpg)| var_samp(mpg)|
+---+------------------+------------------+
| 8| 13.75| 6.746999999999998|
| 6| 21.4| NaN|
+---+------------------+------------------+
在这种情况下,“键”是cyl
和“值”avg(mpg)
和var_samp(mpg)
。
这两个的(近似)结果将是:
+---+--------+-------------------+
|cyl|avg(mpg)| var_samp(mpg)|
+---+--------+-------------------+
| 8| 29.55| 7.76712|
| 6| 42.3|0.48999999999999966|
| 4| 33.9| 0.0|
+---+--------+-------------------+
请注意如何NaN
被认为是零,以及某些 DataFrame 中可能缺少“键”(第二个中缺少 4 个键)。
我怀疑reduceByKey
是去这里的方式,但不能让它工作。
到目前为止,这是我的代码:
case class Cars(car: String, mpg: String, cyl: String, disp: String, hp: String,
drat: String, wt: String, qsec: String, vs: String, am: String, gear: String, carb: String)
object Bootstrapping extends App {
override def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark and SparkSql").setMaster("local")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
// Exploring SparkSQL
// Initialize an SQLContext
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import sqlContext.implicits._
// Load a cvs file
val csv = sc.textFile("mtcars.csv")
// Create a Spark DataFrame
val headerAndRows = csv.map(line => line.split(",").map(_.trim))
val header = headerAndRows.first
val mtcdata = headerAndRows.filter(_(0) != header(0))
val mtcars = mtcdata
.map(p => Cars(p(0), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10), p(11)))
.toDF
// Aggregate data after grouping by columns
import org.apache.spark.sql.functions._
mtcars.sort($"cyl").show()
mtcars.groupBy("cyl").agg(avg("mpg"), var_samp("mpg")).sort($"cyl").show()
//sample 25% of the population without replacement
val sampledData = mtcars.sample(false, 0.25)
//bootstrapping loop
for (a <- 1 to 5) {
//get bootstrap sample
val bootstrapSample = sampledData.sample(true, 1)
//HERE!!! I WANT TO SAVE THE AGGREGATED SUM OF THE FOLLOWING:
bootstrapSample.groupBy("cyl").agg(avg("mpg"), var_samp("mpg"))
}
}
}
这是我正在使用的数据:Motor Trend Car Road Tests
解决方案
一种方法是对union
两个 DataFrame,使用when/otherwise
to translateNaN
和 performgroupBy
来聚合sum
列的 s,如下所示:
import org.apache.spark.sql.functions._
import spark.implicits._
val df1 = Seq(
(8, 15.8, 1.0200000000000014),
(6, 20.9, 0.48999999999999966),
(4, 33.9, 0.0)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
val df2 = Seq(
(8, 13.75, 6.746999999999998),
(6, 21.4, Double.NaN)
).toDF("cyl", "avg_mpg", "var_samp_mpg")
(df1 union df2).
withColumn("var_samp_mpg", when($"var_samp_mpg".isNaN, 0.0).otherwise($"var_samp_mpg")).
groupBy("cyl").agg(sum("avg_mpg"), sum("var_samp_mpg")).
show
// +---+------------+-------------------+
// |cyl|sum(avg_mpg)| sum(var_samp_mpg)|
// +---+------------+-------------------+
// | 6| 42.3|0.48999999999999966|
// | 4| 33.9| 0.0|
// | 8| 29.55| 7.7669999999999995|
// +---+------------+-------------------+
推荐阅读
- javascript - HTTP 请求正文返回空
- python - 如何在不覆盖python中先前的多行的情况下将多个输出行写入文本文件?
- python - Python 中(命名)元组的字典和速度/RAM 性能
- json - 使用 python 和 pandas 遍历具有不同类别名称的 API
- ios - 由于 Cocoapods 安装失败,iOS 框架添加失败
- graphql - 如何从解析的graphql中删除不需要的节点?
- angular - 我们如何从我们在 Firestore 安全规则中使用的 Angular Firestore 传递 uid
- python - 在 keras 中禁用 tensorflow 回溯警告
- racket - 是否可以将方法参数绑定到 Racket 中的任意位置?
- xcode - 找不到“Cordova/CDVUIWebViewDelegate.h”文件