java - Spark 3.0 排序并应用于组 Scala/Java
问题描述
我有 spark DataSet 让我们用 A、B、C 列查看
我要获取数据集
- 在 A 列上分组
- B 列上的排序组(不是整个数据集)
- 遍历单个组,在连续 N 行之间查找一些序列/模式,并基于形成结果数据集的标准返回行
在 Flink 中
dataset.groupBy(0).sortGroup(1, Order.ASCENDING)
.reduceGroup({})
在 Pyspark
我们可以使用 Pandas 在 group 上调用 apply 函数并在 pandas 中排序但是与 Flink 相比,它的速度非常慢 10 倍
注意:我想对分组数据进行处理并返回另一个不是标准聚合的数据集
有人可以向我指出有关如何在 Spark 中的 java/scala 中执行的类似代码吗?
解决方案
几种可能的方法取决于迭代逻辑:
使用数据集 API
给定
val df =
Seq(("a", 0, "foo"), ("b", 1, "foo"), ("a", 1, "foobar"))
.toDF("A", "B", "C")
首先对其进行一些预处理
df.select($"A", struct($"B", $"C") as $"S").show()
要得到
+---+-----------+
| A| S|
+---+-----------+
| a| [0, foo]|
| b| [1, foo]|
| a|[1, foobar]|
+---+-----------+
现在我们可以将任何 Scala 代码应用于元组 S 的序列,包括排序:
df.select($"A", struct($"B", $"C") as $"S")
.groupBy("A")
.agg(collect_list("S"))
.as[(String, Seq[(Int, String)])]
.map {
case (a, l) => (a, l.sortBy(_._1).map(_._2).maxBy(_.length))
}
.show()
使用 UDAF
实现自定义UDAF:
class MyAgg extends Aggregator[
(Int, String),
mutable.ListBuffer[(Int, String)],
/* any output type here */] {
...
并使用它聚合:
val myagg = udaf(new MyAgg())
df.select($"A", struct($"B", $"C") as "S").groupBy($"A").agg(myagg($"S"))
推荐阅读
- salesforce - Talend 与 salesforce 的连接很慢
- android - 如何交换授权代码以从 Android 获取访问代码和刷新令牌?
- .net - Azure Blob 存储 SDK v12 - BlobClient DownloadAsync 消失了吗?
- reactjs - 如何修复错误--->“DeepMap”类型上不存在属性“名称”
- python - 测试用例的并行执行不适用于 pytest-html-reporter 报告
- google-fit - 使用多个传感器时,Google Fit REST API 步骤结果与应用编号不匹配
- pandas - 尝试使用 excel pandas 读取数据...并在多个文件中出现一致的错误
- binding - 从 Svelte 中的父组件与 Slot 通信
- r - 使用登录表单(shinyauthr 包)运行 Shiny 应用程序时出错:`filter()` 输入 `..1` 出现问题。我输入`..1`
- firebase - 如何以 Firebase 作为后端部署 expo react 本机应用程序?