首页 > 解决方案 > spark scala列到列唯一值的计数器

问题描述

如何正确获取列值作为 Map(k->v),其中 k 是唯一值,v 是出现计数?我在groupby中做。

val getMapUDF = udf((arr: Array[Long]) => {arr.groupBy(identity).map{ case (x,y) => x -> y.size}})
    
df
    .withWatermark("time", "30 seconds")
    .groupBy(window(col("time"), "1 minutes").alias("someTime"), col("foo"), col("bar"))
    .agg(count("*").alias("rowCount"), collect_list(col("aaa")).alias("aaaList"))
    .withColumn("qtypes", getMapUDF(col("foobar")))

编辑:输入

+-----------+-------------------+
| foo | bar | foobar            |
+-----------+-------------------+
| aaa | a   | [1,1,1,2,3,3]     |
| bbb | b   | [1,2,3,1,2]       |
+-----------+-------------------+

预期产出

+-----------+--------------------+
| foo | bar | foobarMap          |
+-----------+--------------------+
| aaa | a   | [1->3, 2->1, 3->2] |
| bbb | b   | [1->2, 2->2, 3->1] |
+-----------+--------------------+

问:我可以用map_from_arrays吗?

标签: scalaapache-sparkdictionarycounter

解决方案


我认为可以做一些事情,而不是collect_list这样你就可以在不做 2 的情况下得到你想要的groupBy。我假设您的输入数据如下df所示。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

df.show
+---+---+---+
|foo|bar|aaa|
+---+---+---+
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  1|
|aaa|  a|  2|
|aaa|  a|  3|
|aaa|  a|  3|
|bbb|  b|  1|
|bbb|  b|  2|
|bbb|  b|  3|
|bbb|  b|  1|
|bbb|  b|  2|
+---+---+---+

val df2 = df.withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy("foo", "bar", "aaa"))
    )
).groupBy(
    "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")

df2.show(2,0)
+---+---+--------+------------------------+
|foo|bar|rowcount|foobarmap               |
+---+---+--------+------------------------+
|aaa|a  |6       |[2 -> 1, 3 -> 2, 1 -> 3]|
|bbb|b  |5       |[2 -> 2, 3 -> 1, 1 -> 2]|
+---+---+--------+------------------------+

要添加水印和按窗口分组,可以按如下方式实现您的代码:

val df2 = df.withWatermark(
    "time", "30 seconds"
).withColumn(
    "foobarmap",
    struct(
        $"aaa",
        count("aaa").over(Window.partitionBy(window(col("time"), "1 minutes"), "foo", "bar", "aaa"))
    ).alias("foobarmap")
).groupBy(
    window(col("time"), "1 minutes"), "foo", "bar"
).agg(
    count("*").alias("rowcount"), 
    map_from_entries(collect_set("foobarmap")).alias("foobarmap")
).orderBy("foo")

推荐阅读