scala - Spark中的累积产品
问题描述
我尝试在 Spark Scala 中实现一个累积产品,但我真的不知道如何去做。我有以下数据框:
Input data:
+--+--+--------+----+
|A |B | date | val|
+--+--+--------+----+
|rr|gg|20171103| 2 |
|hh|jj|20171103| 3 |
|rr|gg|20171104| 4 |
|hh|jj|20171104| 5 |
|rr|gg|20171105| 6 |
|hh|jj|20171105| 7 |
+-------+------+----+
我想有以下输出:
Output data:
+--+--+--------+-----+
|A |B | date | val |
+--+--+--------+-----+
|rr|gg|20171105| 48 | // 2 * 4 * 6
|hh|jj|20171105| 105 | // 3 * 5 * 7
+-------+------+-----+
解决方案
只要数字严格为正(0 也可以处理,如果存在,使用coalesce
),如您的示例中所示,最简单的解决方案是计算对数之和并取指数:
import org.apache.spark.sql.functions.{exp, log, max, sum}
val df = Seq(
("rr", "gg", "20171103", 2), ("hh", "jj", "20171103", 3),
("rr", "gg", "20171104", 4), ("hh", "jj", "20171104", 5),
("rr", "gg", "20171105", 6), ("hh", "jj", "20171105", 7)
).toDF("A", "B", "date", "val")
val result = df
.groupBy("A", "B")
.agg(
max($"date").as("date"),
exp(sum(log($"val"))).as("val"))
由于这使用了 FP 算法,因此结果将不准确:
result.show
+---+---+--------+------------------+
| A| B| date| val|
+---+---+--------+------------------+
| hh| jj|20171105|104.99999999999997|
| rr| gg|20171105|47.999999999999986|
+---+---+--------+------------------+
但四舍五入后应该足以满足大多数应用程序的需要。
result.withColumn("val", round($"val")).show
+---+---+--------+-----+
| A| B| date| val|
+---+---+--------+-----+
| hh| jj|20171105|105.0|
| rr| gg|20171105| 48.0|
+---+---+--------+-----+
如果这还不够,您可以定义一个UserDefinedAggregateFunction
或Aggregator
(如何在 Spark SQL 中定义和使用用户定义的聚合函数?)或使用功能 API reduceGroups
:
import scala.math.Ordering
case class Record(A: String, B: String, date: String, value: Long)
df.withColumnRenamed("val", "value").as[Record]
.groupByKey(x => (x.A, x.B))
.reduceGroups((x, y) => x.copy(
date = Ordering[String].max(x.date, y.date),
value = x.value * y.value))
.toDF("key", "value")
.select($"value.*")
.show
+---+---+--------+-----+
| A| B| date|value|
+---+---+--------+-----+
| hh| jj|20171105| 105|
| rr| gg|20171105| 48|
+---+---+--------+-----+
推荐阅读
- python - 错误引发 InvalidToken cryptography.fernet.InvalidToken
- android - 使用 cURL 在 android 中获取 paypal 访问令牌
- python - 删除 numpy.ndarray 中的中间括号集
- javascript - 如何在 JavaScript 中用空字符串替换“未定义”
- java - 我的电脑出现 Android Studio gradle 构建错误
- reactjs - 使用 React 从 Firestore 中的 id 数组中获取文档数组
- android - 在 Android 中显示延迟的相机镜头 - 如何实现?
- terraform - 配置 Terraform map(object{}) 变量,其中一些键:值是敏感的,而另一些则具有正则表达式验证。for_each 处理不了
- amazon-web-services - 使用动态端口映射时目标组中的端口错误
- c++ - 当 if/else 链的长度超过 100 行时,缩进格式损坏 - Visual Studio 2017 C++