scala - 直方图 - 以并行方式进行
问题描述
+----+----+--------+
| Id | M1 | trx |
+----+----+--------+
| 1 | M1 | 11.35 |
| 2 | M1 | 3.4 |
| 3 | M1 | 10.45 |
| 2 | M1 | 3.95 |
| 3 | M1 | 20.95 |
| 2 | M2 | 25.55 |
| 1 | M2 | 9.95 |
| 2 | M2 | 11.95 |
| 1 | M2 | 9.65 |
| 1 | M2 | 14.54 |
+----+----+--------+
使用上面的数据框,我应该能够使用下面的代码生成如下直方图。 类似的Queston在这里
val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1)
但这里的问题是,如何根据列“M1”并行创建直方图?这意味着我需要为列值 M1 和 M2 输出两个直方图。
解决方案
首先,您需要知道histogram
生成两个单独的顺序作业。一种用于检测数据的最小值和最大值,一种用于计算实际的直方图。您可以使用 Spark UI 进行检查。
我们可以按照相同的方案在任意多的列上构建直方图,只需两个作业。然而,我们不能使用histogram
仅用于处理一个双打集合的函数。我们需要自己去实现。第一份工作很简单。
val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head
然后我们在本地计算直方图的范围。请注意,我对所有列使用相同的范围。它允许在列之间轻松比较结果(通过将它们绘制在同一图上)。不过,每列有不同的范围只是对这段代码的一个小修改。
val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
.scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value
那是第一部分。然后,我们可以使用 UDF 来确定每个值在什么范围内结束,并与 spark 并行计算所有直方图。
val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
.withColumn("rangeIndex", range_index('trx))
.groupBy("M1", "rangeIndex")
.count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2| 2| 2|
| M1| 0| 2|
| M2| 5| 1|
| M1| 3| 2|
| M2| 3| 1|
| M1| 7| 1|
| M2| 10| 1|
+---+----------+-----+
作为奖励,您可以使用 RDD API 或通过收集数据帧并在 scala 中修改数据来塑造数据以在本地(在驱动程序内)使用它。
这是使用火花的一种方法,因为这是关于火花的问题;-)
val hist_map = hist_df.rdd
.map(row => row.getAs[String]("M1") ->
(row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
.groupByKey
.mapValues( _.toMap)
.mapValues( hists => (1 to hist_size)
.map(i => hists.getOrElse(i, 0L)).toArray )
.collectAsMap
编辑:如何为每列值建立一个范围:
我们不是计算 M1 的最小值和最大值,而是为列的每个值计算它groupBy
。
val min_max_map = df.groupBy("M1")
.agg(min('trx), max('trx))
.rdd.map(row => row.getAs[String]("M1") ->
(row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
.collectAsMap // maps each column value to a tuple (min, max)
然后我们调整 UDF 以便它使用这个映射,我们就完成了。
// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
val hist_step = (max_trx - min_trx) / hist_size
(1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
.map(key => key ->
generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
.toMap
val range_index = udf((x : Double, m1 : String) =>
range_map(m1).lastIndexWhere(x >= _))
最后,只需替换range_index('trx)
为range_index('trx, 'M1)
,每列值将有一个范围。
推荐阅读
- oauth - 实施从谷歌 Oauth 注销
- jquery - 带有分数的 Swiper 滑块进度条不起作用
- unit-testing - 使用项目反应器重试单元测试
- javascript - 我不知道如何将来自 document.write 的输入存储在变量中
- c# - 如何在 C# web api 中的查询字符串上绑定 guid 列表?
- arduino - 交流调光器EpS32
- c# - 在 C# 中使用基本身份验证返回错误消息的 RESTful 调用
- javascript - Node.js spawn:保持 StdOut 和 StdErr 的原始顺序
- swift - 优雅地检查 Swift 中的两个结构实例是否不同
- r - 使用操作按钮时如何消除将参数传递给渲染函数的延迟?