首页 > 解决方案 > 在 Spark 中作为分组子句的 Dataframe 的列值

问题描述

我有一个看起来像下面提到的表格的数据

+----+----+--------+-------+--------+----------------------+
|User|Shop|Location| Seller|Quantity|         GroupBYClause|
+----+----+--------+-------+--------+----------------------+
|   1| ABC|    Loc1|Seller1|      10|        Shop, location|
|   1| ABC|    Loc1|Seller2|      10|        Shop, location|
|   2| ABC|    Loc1|Seller1|      10|Shop, location, Seller|
|   2| ABC|    Loc1|Seller2|      10|Shop, location, Seller|
|   3| BCD|    Loc1|Seller1|      10|              location|
|   3| BCD|    Loc1|Seller2|      10|              location|
|   3| CDE|    Loc2|Seller3|      10|              location|
+----+----+--------+-------+--------+----------------------+

预期的最终输出是具有附加列的相同数据,即。Sum(Quantity) 将根据用户提到的聚合获得数量的总和

例如,用户 1 将 GroupBYClause 称为“商店、位置”,因此与卖家无关,用户 1 的总和(数量)为 20

类似地,对于用户 2,GroupBYClause 为“shop, location, Seller”,因此 sum(quantity) 将与每行相同,即 10

期望的输出

+------+----+--------+-------+--------+----------------------+-------------+
|UserId|Shop|location| Seller|Quantity|         GroupBYClause|Sum(Quantity)|
+------+----+--------+-------+--------+----------------------+-------------+
|     1| ABC|    Loc1|Seller1|      10|        Shop, location|           20|
|     1| ABC|    Loc1|Seller2|      10|        Shop, location|           20|
|     2| ABC|    Loc1|Seller1|      10|Shop, location, Seller|           10|
|     2| ABC|    Loc1|Seller2|      10|Shop, location, Seller|           10|
|     3| BCD|    Loc1|Seller1|      10|              location|           20|
|     3| BCD|    Loc1|Seller2|      10|              location|           20|
|     3| CDE|    Loc2|Seller3|      10|              location|           10|
+------+----+--------+-------+--------+----------------------+-------------+

我面临的挑战是在 Spark 中使用列值作为分组子句

请帮忙

val df = spark.createDataFrame(Seq(
    (1, "ABC","Loc1","Seller1", 10, "Shop, location"),
    (1, "ABC","Loc1","Seller2", 10, "Shop, location"),
    (2, "ABC","Loc1","Seller1", 10, "Shop, location, Seller"),
    (2, "ABC","Loc1","Seller2", 10, "Shop, location, Seller"),
    (3, "BCD","Loc1","Seller1", 10, "location"),
    (3, "BCD","Loc1","Seller2", 10, "location"),
    (3, "CDE","Loc2","Seller3", 10, "location")
  )).toDF("UserId","Shop", "Location","Seller", "Quantity", "GroupBYClause")

标签: sqlscalaapache-spark

解决方案


尝试这个-

加载提供的测试数据

 df1.show(false)
    df1.printSchema()
    /**
      * +----+----+--------+-------+--------+----------------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |
      * +----+----+--------+-------+--------+----------------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |
      * |3   |BCD |Loc1    |Seller1|10      |location              |
      * |3   |BCD |Loc1    |Seller2|10      |location              |
      * |3   |CDE |Loc2    |Seller3|10      |location              |
      * +----+----+--------+-------+--------+----------------------+
      *
      * root
      * |-- User: integer (nullable = true)
      * |-- Shop: string (nullable = true)
      * |-- Location: string (nullable = true)
      * |-- Seller: string (nullable = true)
      * |-- Quantity: integer (nullable = true)
      * |-- GroupBYClause: string (nullable = true)
      */

求和

    val isShopLocation = Seq("Shop", "location").map(array_contains($"arr", _)).reduce(_ && _)
    val isShopLocationSeller = Seq("Shop", "location", "Seller").map(array_contains($"arr", _)).reduce(_ && _)
    val isLocation = array_contains($"arr", "location")
    df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
      .withColumn("arr",
        when(isShopLocationSeller, expr("array(Shop, location, Seller)"))
          .when(isShopLocation, expr("array(Shop, location)"))
          .when(isLocation, expr("array(location)"))
      ).withColumn("sum_quantity",
      sum("Quantity").over(Window.partitionBy("User","arr")))
      .show(false)

    /**
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |sum_quantity|
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |20          |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |20          |
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|10          |
      * |3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |10          |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|10          |
      * |3   |BCD |Loc1    |Seller1|10      |location              |[Loc1]              |20          |
      * |3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |20          |
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      */

动态定义分区

val columns = Seq("Shop", "location", "Seller").flatMap(f => Seq(lit(f), col(f)))
    df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
      .withColumn("map1", map(columns: _*))
      .withColumn("arr", expr("TRANSFORM(arr, x -> map1[x])"))
      .withColumn("sum_quantity",
        sum("Quantity").over(Window.partitionBy("User","arr")))
      .show(false)

    /**
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |map1                                              |sum_quantity|
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20          |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20          |
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10          |
      * |3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10          |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10          |
      * |3   |BCD |Loc1    |Seller1|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|20          |
      * |3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|20          |
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      */

Edit-1(基于评论)

// input
+----+----+--------+-------+--------+----------------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause         |
+----+----+--------+-------+--------+----------------------+
|1   |ABC |Loc1    |Seller1|10      |Shop, location        |
|1   |ABC |Loc1    |Seller2|10      |Shop, location        |
|2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|
|2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |
|3   |BCD |Loc1    |Seller1|10      |location,Seller       |
|3   |BCD |Loc1    |Seller2|10      |location              |
|3   |CDE |Loc2    |Seller3|10      |location              |
+----+----+--------+-------+--------+----------------------+

root
 |-- User: integer (nullable = true)
 |-- Shop: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Seller: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- GroupBYClause: string (nullable = true)

// Output
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |map1                                              |sum_quantity|
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20          |
|1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20          |
|2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10          |
|3   |BCD |Loc1    |Seller1|10      |location,Seller       |[Loc1, Seller1]     |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|10          |
|3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10          |
|2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10          |
|3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|10          |
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+


推荐阅读