sql - 在 Spark 中作为分组子句的 Dataframe 的列值
问题描述
我有一个看起来像下面提到的表格的数据
+----+----+--------+-------+--------+----------------------+
|User|Shop|Location| Seller|Quantity| GroupBYClause|
+----+----+--------+-------+--------+----------------------+
| 1| ABC| Loc1|Seller1| 10| Shop, location|
| 1| ABC| Loc1|Seller2| 10| Shop, location|
| 2| ABC| Loc1|Seller1| 10|Shop, location, Seller|
| 2| ABC| Loc1|Seller2| 10|Shop, location, Seller|
| 3| BCD| Loc1|Seller1| 10| location|
| 3| BCD| Loc1|Seller2| 10| location|
| 3| CDE| Loc2|Seller3| 10| location|
+----+----+--------+-------+--------+----------------------+
预期的最终输出是具有附加列的相同数据,即。Sum(Quantity) 将根据用户提到的聚合获得数量的总和
例如,用户 1 将 GroupBYClause 称为“商店、位置”,因此与卖家无关,用户 1 的总和(数量)为 20
类似地,对于用户 2,GroupBYClause 为“shop, location, Seller”,因此 sum(quantity) 将与每行相同,即 10
期望的输出
+------+----+--------+-------+--------+----------------------+-------------+
|UserId|Shop|location| Seller|Quantity| GroupBYClause|Sum(Quantity)|
+------+----+--------+-------+--------+----------------------+-------------+
| 1| ABC| Loc1|Seller1| 10| Shop, location| 20|
| 1| ABC| Loc1|Seller2| 10| Shop, location| 20|
| 2| ABC| Loc1|Seller1| 10|Shop, location, Seller| 10|
| 2| ABC| Loc1|Seller2| 10|Shop, location, Seller| 10|
| 3| BCD| Loc1|Seller1| 10| location| 20|
| 3| BCD| Loc1|Seller2| 10| location| 20|
| 3| CDE| Loc2|Seller3| 10| location| 10|
+------+----+--------+-------+--------+----------------------+-------------+
我面临的挑战是在 Spark 中使用列值作为分组子句
请帮忙
val df = spark.createDataFrame(Seq(
(1, "ABC","Loc1","Seller1", 10, "Shop, location"),
(1, "ABC","Loc1","Seller2", 10, "Shop, location"),
(2, "ABC","Loc1","Seller1", 10, "Shop, location, Seller"),
(2, "ABC","Loc1","Seller2", 10, "Shop, location, Seller"),
(3, "BCD","Loc1","Seller1", 10, "location"),
(3, "BCD","Loc1","Seller2", 10, "location"),
(3, "CDE","Loc2","Seller3", 10, "location")
)).toDF("UserId","Shop", "Location","Seller", "Quantity", "GroupBYClause")
解决方案
尝试这个-
加载提供的测试数据
df1.show(false)
df1.printSchema()
/**
* +----+----+--------+-------+--------+----------------------+
* |User|Shop|Location|Seller |Quantity|GroupBYClause |
* +----+----+--------+-------+--------+----------------------+
* |1 |ABC |Loc1 |Seller1|10 |Shop, location |
* |1 |ABC |Loc1 |Seller2|10 |Shop, location |
* |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|
* |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |
* |3 |BCD |Loc1 |Seller1|10 |location |
* |3 |BCD |Loc1 |Seller2|10 |location |
* |3 |CDE |Loc2 |Seller3|10 |location |
* +----+----+--------+-------+--------+----------------------+
*
* root
* |-- User: integer (nullable = true)
* |-- Shop: string (nullable = true)
* |-- Location: string (nullable = true)
* |-- Seller: string (nullable = true)
* |-- Quantity: integer (nullable = true)
* |-- GroupBYClause: string (nullable = true)
*/
求和
val isShopLocation = Seq("Shop", "location").map(array_contains($"arr", _)).reduce(_ && _)
val isShopLocationSeller = Seq("Shop", "location", "Seller").map(array_contains($"arr", _)).reduce(_ && _)
val isLocation = array_contains($"arr", "location")
df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
.withColumn("arr",
when(isShopLocationSeller, expr("array(Shop, location, Seller)"))
.when(isShopLocation, expr("array(Shop, location)"))
.when(isLocation, expr("array(location)"))
).withColumn("sum_quantity",
sum("Quantity").over(Window.partitionBy("User","arr")))
.show(false)
/**
* +----+----+--------+-------+--------+----------------------+--------------------+------------+
* |User|Shop|Location|Seller |Quantity|GroupBYClause |arr |sum_quantity|
* +----+----+--------+-------+--------+----------------------+--------------------+------------+
* |1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |20 |
* |1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |20 |
* |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|10 |
* |3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |10 |
* |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|10 |
* |3 |BCD |Loc1 |Seller1|10 |location |[Loc1] |20 |
* |3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |20 |
* +----+----+--------+-------+--------+----------------------+--------------------+------------+
*/
动态定义分区
val columns = Seq("Shop", "location", "Seller").flatMap(f => Seq(lit(f), col(f)))
df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
.withColumn("map1", map(columns: _*))
.withColumn("arr", expr("TRANSFORM(arr, x -> map1[x])"))
.withColumn("sum_quantity",
sum("Quantity").over(Window.partitionBy("User","arr")))
.show(false)
/**
* +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
* |User|Shop|Location|Seller |Quantity|GroupBYClause |arr |map1 |sum_quantity|
* +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
* |1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20 |
* |1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20 |
* |2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10 |
* |3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10 |
* |2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10 |
* |3 |BCD |Loc1 |Seller1|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|20 |
* |3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|20 |
* +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
*/
Edit-1(基于评论)
// input
+----+----+--------+-------+--------+----------------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause |
+----+----+--------+-------+--------+----------------------+
|1 |ABC |Loc1 |Seller1|10 |Shop, location |
|1 |ABC |Loc1 |Seller2|10 |Shop, location |
|2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|
|2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |
|3 |BCD |Loc1 |Seller1|10 |location,Seller |
|3 |BCD |Loc1 |Seller2|10 |location |
|3 |CDE |Loc2 |Seller3|10 |location |
+----+----+--------+-------+--------+----------------------+
root
|-- User: integer (nullable = true)
|-- Shop: string (nullable = true)
|-- Location: string (nullable = true)
|-- Seller: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- GroupBYClause: string (nullable = true)
// Output
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause |arr |map1 |sum_quantity|
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|1 |ABC |Loc1 |Seller1|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20 |
|1 |ABC |Loc1 |Seller2|10 |Shop, location |[ABC, Loc1] |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20 |
|2 |ABC |Loc1 |Seller2|10 |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10 |
|3 |BCD |Loc1 |Seller1|10 |location,Seller |[Loc1, Seller1] |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|10 |
|3 |CDE |Loc2 |Seller3|10 |location |[Loc2] |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10 |
|2 |ABC |Loc1 |Seller1|10 |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10 |
|3 |BCD |Loc1 |Seller2|10 |location |[Loc1] |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|10 |
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
推荐阅读
- c# - Azure Functions:HttpRequestMessage 不包含“CreateResponse”的定义
- ruby-on-rails - pundit rails 5 无法强制执行创建方法限制
- reactjs - 代码有问题,它说“TypeError:无法读取未定义的属性'stocks'”
- android - Flutter 图像缓存:图像正在重新加载
- java - 如何实例化在外部 Jar 中定义的对象,这些对象在 Java 中实现带有反射的接口
- angular - 在 Angular 6 中使用自定义错误处理更新组件
- mysql - MySQL存储过程表插入错误
- r - 在没有发布按钮、csv 数据和查看 R 代码的情况下离线shinyApp 的可能性?
- reactjs - _deleteQuote 不是函数
- locking - 反应式编程 - 在集群中运行作业