首页 > 解决方案 > 在 pyspark 中使用自定义顺序选择最大/最大值

问题描述

我有一些示例数据,如下所示df1

| id1 | id2  | yyyy_mm_dd |
|-----|------|------------|
| 1   | 3245 | 2021-01-01 |
| 1   | 4564 | 2021-01-01 |
| 1   | 3546 | 2021-01-01 |
| 1   | 632  | 2021-01-01 |
| 1   | 521  | 2021-01-01 |
| 2   | 7413 | 2021-01-01 |
| ... | ...  | ...        |

然后我有第二个df跟踪status每个人每天的时间id2,命名为df2

| yyyy_mm_dd | id2  | product | status |
|------------|------|---------|--------|
| 2021-01-01 | 3245 | p1      | i      |
| 2021-01-01 | 3245 | p2      | f_c    |
| 2021-01-01 | 3245 | p3      | n_c    |
| 2021-01-01 | 4564 | p1      | n_c    |
| 2021-01-01 | 4564 | p2      | n_c    |
| 2021-01-01 | 4564 | p3      | n_c    |
| 2021-01-01 | 3546 | p1      | f_c    |
| 2021-01-01 | 3546 | p2      | n_c    |
| 2021-01-01 | 3546 | p3      | n_c    |
| 2021-01-01 | 7413 | p1      | f_c    |
| ...        | ...  | ...     | ..     |

我想创建一个输出数据框,从而id1继承statusfrom id2。我面临的问题是之间存在一对多的关系id1id2因此很难继承状态。

考虑到这一点,我想从状态中获取greatest/max值,但这也很困难,因为它们是字符串。虽然,有这样的层次结构 i > f_c > n_c

基于上述,我希望我的输出如下所示:

| yyyy_mm_dd | id1 | product | status |
|------------|-----|---------|--------|
| 2020-01-01 | 1   | p1      | i      |
| 2020-01-01 | 1   | p2      | f_c    |
| 2020-01-01 | 1   | p3      | n_c    |
| 2020-01-01 | 2   | p1      | f_c    |

对于输出,id1= 1 继承了i statusforp1因为iid2=中的最大状态(3245, 4564, 3546)id1对于= 1 和p2,也可以看到相同f_c的结果,status因为它是id2=中的最大值(3245, 4564, 3546)


我知道我可以像这样加入数据:

df3 = (
    df1
    .join(df2, on = ['yyyy_mm_dd', 'id2']
)

但我不确定如何statusid2s 中取最大,因为它不是数字的。

标签: python-3.xapache-sparkpysparkapache-spark-sql

解决方案


您可以df使用df2onid2yyyy_mm_ddcolumns 连接,然后计算行号和 orderby 一个 when 表达式用于自定义排序status

from pyspark.sql import functions as F, Window

result = df1.join(df2, ["yyyy_mm_dd", "id2"]).withColumn(
    "rn",
    F.row_number().over(
        Window.partitionBy("yyyy_mm_dd", "id1", "product").orderBy(
            F.when(F.col("status") == "i", 1).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 3)
        )
    )
).filter("rn = 1").drop("id2", "rn")

result.show()
#+----------+---+-------+------+
#|yyyy_mm_dd|id1|product|status|
#+----------+---+-------+------+
#|2021-01-01|  1|     p2|   f_c|
#|2021-01-01|  2|     p1|   f_c|
#|2021-01-01|  1|     p1|    i |
#|2021-01-01|  1|     p3|   n_c|
#+----------+---+-------+------+

或者,如果您更喜欢使用groupBywith max

result = df1.join(df2, ["yyyy_mm_dd", "id2"]).groupBy("yyyy_mm_dd", "id1", "product").agg(
    F.max(
        F.when(F.col("status") == "i", 3).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 1)
    ).alias("max_status")
).select(
    "yyyy_mm_dd", "id1", "product",
    F.when(F.col("max_status") == 3, "i")
     .when(F.col("max_status") == 2, "f_c")
     .when(F.col("max_status") == 1, "n_c").alias("status")
)

推荐阅读