scala - spark scala列到列唯一值的计数器
问题描述
如何正确获取列值作为 Map(k->v),其中 k 是唯一值,v 是出现计数?我在groupby中做。
val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})
df
.withWatermark("time", "30 seconds")
.groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
.agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
.withColumn("qtypes", getMapUDF(col("foobar")))
编辑:输入
+-----------+-------------------+
| foo | bar | foobar |
+-----------+-------------------+
| aaa | a | [1,1,1,2,3,3] |
| bbb | b | [1,2,3,1,2] |
+-----------+-------------------+
预期产出
+-----------+--------------------+
| foo | bar | foobarMap |
+-----------+--------------------+
| aaa | a | [1->3, 2->1, 3->2] |
| bbb | b | [1->2, 2->2, 3->1] |
+-----------+--------------------+
问:我可以用map_from_arrays
吗?
解决方案
我认为可以做一些事情,而不是collect_list
这样你就可以在不做 2 的情况下得到你想要的groupBy
。我假设您的输入数据如下df
所示。
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
df.show
+---+---+---+
|foo|bar|aaa|
+---+---+---+
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 1|
|aaa| a| 2|
|aaa| a| 3|
|aaa| a| 3|
|bbb| b| 1|
|bbb| b| 2|
|bbb| b| 3|
|bbb| b| 1|
|bbb| b| 2|
+---+---+---+
val df2 = df.withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
)
).groupBy(
"foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
df2.show(2,0)
+---+---+--------+------------------------+
|foo|bar|rowcount|foobarmap |
+---+---+--------+------------------------+
|aaa|a |6 |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b |5 |[2 -> 2, 3 -> 1, 1 -> 2]|
+---+---+--------+------------------------+
要添加水印和按窗口分组,可以按如下方式实现您的代码:
val df2 = df.withWatermark(
"time", "30 seconds"
).withColumn(
"foobarmap",
struct(
$"aaa",
count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
).alias("foobarmap")
).groupBy(
window(col("time"), "1 minutes"), "foo", "bar"
).agg(
count("*").alias("rowcount"),
map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")
推荐阅读
- asp.net-mvc - @Html.EditorFor 查看缓存旧数据,无法显示日期选择器
- sql - UPDATE 语句与 Sql Server 2014 上的 FOREIGN KEY 约束冲突
- mysql - 用正则表达式替换字符串以获得从机器人框架中的 MySQL 查询中获取的值
- sql - 外键表中其他列的唯一性约束
- css - 无论大小和格式如何,在 div 中居中图像而不拉伸它
- java - 对于 Java8 上的方法调用
- c# - 代码正在绕过等待并且不会停止等待 api 完成
- android - 如何将所有列放在房间中?
- plotly-dash - 如何获取鼠标选中的文本(高亮文本)?
- python - 安装 Django 频道错误