首页 > 解决方案 > 直方图 - 以并行方式进行

问题描述

+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M2 | 25.55  |
| 1  | M2 |  9.95  |
| 2  | M2 | 11.95  |
| 1  | M2 |  9.65  |
| 1  | M2 | 14.54  |
+----+----+--------+

使用上面的数据框,我应该能够使用下面的代码生成如下直方图。 类似的Queston在这里

val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

但这里的问题是,如何根据列“M1”并行创建直方图?这意味着我需要为列值 M1 和 M2 输出两个直方图。

标签: scalaapache-spark

解决方案


首先,您需要知道histogram生成两个单独的顺序作业。一种用于检测数据的最小值和最大值,一种用于计算实际的直方图。您可以使用 Spark UI 进行检查。

我们可以按照相同的方案在任意多的列上构建直方图,只需两个作业。然而,我们不能使用histogram仅用于处理一个双打集合的函数。我们需要自己去实现。第一份工作很简单。

val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head

然后我们在本地计算直方图的范围。请注意,我对所有列使用相同的范围。它允许在列之间轻松比较结果(通过将它们绘制在同一图上)。不过,每列有不同的范围只是对这段代码的一个小修改。

val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
    .scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value

那是第一部分。然后,我们可以使用 UDF 来确定每个值在什么范围内结束,并与 spark 并行计算所有直方图。

val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
    .withColumn("rangeIndex", range_index('trx))
    .groupBy("M1", "rangeIndex")
    .count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2|         2|    2|
| M1|         0|    2|
| M2|         5|    1|
| M1|         3|    2|
| M2|         3|    1|
| M1|         7|    1|
| M2|        10|    1|
+---+----------+-----+

作为奖励,您可以使用 RDD API 或通过收集数据帧并在 scala 中修改数据来塑造数据以在本地(在驱动程序内)使用它。

这是使用火花的一种方法,因为这是关于火花的问题;-)

val hist_map = hist_df.rdd
    .map(row => row.getAs[String]("M1") ->
             (row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
    .groupByKey
    .mapValues( _.toMap)
    .mapValues( hists => (1 to hist_size)
                    .map(i => hists.getOrElse(i, 0L)).toArray )
    .collectAsMap

编辑:如何为每列值建立一个范围:

我们不是计算 M1 的最小值和最大值,而是为列的每个值计算它groupBy

val min_max_map = df.groupBy("M1")
    .agg(min('trx), max('trx))
    .rdd.map(row => row.getAs[String]("M1") ->
      (row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
    .collectAsMap // maps each column value to a tuple (min, max)

然后我们调整 UDF 以便它使用这个映射,我们就完成了。

// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
    val hist_step = (max_trx - min_trx) / hist_size
    (1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
    .map(key => key ->
        generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
    .toMap

val range_index = udf((x : Double, m1 : String) =>
                       range_map(m1).lastIndexWhere(x >= _))

最后,只需替换range_index('trx)range_index('trx, 'M1),每列值将有一个范围。


推荐阅读