scala - 地图火花数据帧上的while循环
问题描述
我在每个会话中使用 scala API 在 spark 中的mapType 遇到了这个问题我们正在发送一个地图,您可以在其中找到用户访问的类别与每个类别中的事件数量相关联
[ home & personal items > interior -> 1, vehicles > cars -> 1]
并非所有用户都访问相同数量的类别,因此地图的大小会根据 user_id 发生变化
我需要计算按类别分组的会话数,以便我需要遍历地图,虽然它不是我以前尝试过的东西,但它不是空的
while (size(col("categoriesRaw")) !== 0) {
df.select(
explode(col("categoriesRaw"))
)
.select(
col("key").alias("categ"),
col("value").alias("number_of_events")
)
}
但我面临一些错误,例如:
type mismatch;
found : org.apache.spark.sql.Column
required: Booleansbt
解决方案
我不确定你想用 while 循环做什么。无论如何,您可以使用 REPL 检查您用作条件的表达式是 aColumn
而不是 a Boolean
,因此是异常。
> size(col("categoriesRaw")) !== 0
res1: org.apache.spark.sql.Column = (NOT (size(categoriesRaw) = 0))
基本上,这是一个需要由 SparkSQL 在 awhere
或select
任何其他使用 Columns 的函数中求值的表达式。
不过,使用您的 spark 代码,您几乎就在那里,您只需要添加一个groupBy
即可到达您想要的位置。让我们从创建数据开始。
import spark.implicits._
val users = Seq( "user 1" -> Map("home & personal items > interior" -> 1,
"vehicles > cars" -> 1),
"user 2" -> Map("vehicles > cars" -> 3))
val df = users.toDF("user", "categoriesRaw")
然后,您不需要 while 循环来遍历映射的所有值。explode
为你做的正是:
val explodedDf = df.select( explode('categoriesRaw) )
explodedDf.show(false)
+--------------------------------+-----+
|key |value|
+--------------------------------+-----+
|home & personal items > interior|1 |
|vehicles > cars |1 |
|vehicles > cars |3 |
+--------------------------------+-----+
最后,你可以使用 groupBy add 得到你想要的。
explodedDf
.select('key as "categ", 'value as "number_of_events")
.groupBy("categ")
.agg(count('*), sum('number_of_events))
.show(false)
+--------------------------------+--------+---------------------+
|categ |count(1)|sum(number_of_events)|
+--------------------------------+--------+---------------------+
|home & personal items > interior|1 |1 |
|vehicles > cars |2 |4 |
+--------------------------------+--------+---------------------+
注意:我不确定您是要计算会话(第一列)还是事件(第二列),所以我计算了两者。
推荐阅读
- pandas - 无法使用 python 从 AWS S3 中的现有 csv 文件中删除行
- flutter - 在颤动中从树中销毁有状态的小部件
- java - JAVA Spring boot:DELETE方法不存在必需的字符串参数“xxx”
- elasticsearch - 动态文本内容
- docker - 在 docker swarm 中创建服务时如何设置 docker image 时区?
- python - 如何使用 Pyspark 在所有文件中使用 Spark 变量?
- python - IMAPLIB - 超时错误
- c - 在其他函数中无法通过strlen 获取字符串的长度?
- python - 验证缩写 Pandas Python
- android - 如何使用导航组件从 Activity 到 Fragment 获取结果?