apache-spark - SparkSQL Windows:基于数组列创建框架
问题描述
我正在寻找使用 SparkSQL 的窗口函数,但在框架规范上有一个自定义条件。
正在操作的数据帧如下:
+--------------------+--------------------+--------------------+-----+
| userid| elementid| prerequisites|score|
+--------------------+--------------------+--------------------+-----+
|a |1 |[] | 1 |
|a |2 |[] | 1 |
|a |3 |[] | 1 |
|b |1 |[] | 1 |
|a |4 |[1, 2] | 1 |
+--------------------+--------------------+--------------------+-----+
列中的每个元素prerequisites
都是另一行列中的值elementid
。
我想创建一个按 分区的窗口userid
,然后抓取elementid
包含在当前行列中的所有前面的行prerequisites
。
一旦我到达这个窗口,我想sum
在score
列上执行一个。
上述示例的所需输出:
+--------------------+--------------------+--------------------+-----+
| userid| elementid| prerequisites|sum |
+--------------------+--------------------+--------------------+-----+
|a |1 |[] | 0 |
|a |2 |[] | 0 |
|a |3 |[] | 0 |
|b |1 |[] | 0 |
|a |4 |[1, 2] | 2 |
+--------------------+--------------------+--------------------+-----+
请注意,因为 usera
是唯一具有其元素先决条件的用户,所以它是唯一具有 > 0 的用户sum
。
我看到的最接近的问题是这个问题,它利用了collect_list。
但是,这并不能构建一个窗口,而是收集一个潜在的 ID 列表。有人对如何构建上述窗口有任何想法吗?
解决方案
scala> import org.apache.spark.sql.expressions.{Window,UserDefinedFunction}
scala> df.show()
+------+---------+-------------+-----+
|userid|elementid|prerequisites|score|
+------+---------+-------------+-----+
| a| 1| []| 1|
| a| 2| []| 1|
| a| 3| []| 1|
| b| 1| []| 1|
| a| 4| [1, 2]| 1|
+------+---------+-------------+-----+
scala> df.printSchema
root
|-- userid: string (nullable = true)
|-- elementid: string (nullable = true)
|-- prerequisites: array (nullable = true)
| |-- element: string (containsNull = true)
|-- score: string (nullable = true)
scala> val W = Window.partitionBy("userid")
scala> val df1 = df.withColumn("elementidList", collect_set(col("elementid")).over(W))
.withColumn("elementidScoreMap", map_from_arrays(col("elementidList"), collect_list(col("score").cast("long")).over(W)))
.withColumn("common", array_intersect(col("prerequisites"), col("elementidList")))
.drop("elementidList", "score")
scala> def getSumUDF:UserDefinedFunction = udf((Score:Map[String,Long], Id:String) => {
| var out:Long = 0
| Id.split(",").foreach{ x => out = Score(x.toString) + out}
| out})
scala> df1.withColumn("sum", when(size(col("common")) =!= 0 ,getSumUDF(col("elementidScoreMap"), concat_ws(",",col("prerequisites")))).otherwise(lit(0)))
.drop("elementidScoreMap", "common")
.show()
+------+---------+-------------+---+
|userid|elementid|prerequisites|sum|
+------+---------+-------------+---+
| b| 1| []| 0|
| a| 1| []| 0|
| a| 2| []| 0|
| a| 3| []| 0|
| a| 4| [1, 2]| 2|
+------+---------+-------------+---+
推荐阅读
- javascript - if/else Javascript 语句返回相反的值
- ruby-on-rails - 如何在 Ruby on Rails 中使用数组对键进行分组和合并
- mysql - MySQL存储过程`错误代码:1064`在运行时
- python - Python pandas:根据较低级别的间距单列到多列
- json - AWS powershell commandlet (Write-CWDashboard) 输出消息“应该与 oneOf 中的一个模式完全匹配”
- python - 为什么当我使用 multiprocessing.Process 运行时 ZeroMQ 无法通信?
- c++ - 使用 Stroustrup 示例的 condition_vairable::wait_for() 问题
- loops - 如何循环寄存器输出的主机变量
- spring-webflux - 从 Mono.fromCallable 返回 Mono.empty()
- java - Apache Camel 到 Firebase 云消息传递 API 400 错误请求错误 NOT_A_JSON_REQUEST