python-3.x - 在 pyspark 中使用自定义顺序选择最大/最大值
问题描述
我有一些示例数据,如下所示df1
:
| id1 | id2 | yyyy_mm_dd |
|-----|------|------------|
| 1 | 3245 | 2021-01-01 |
| 1 | 4564 | 2021-01-01 |
| 1 | 3546 | 2021-01-01 |
| 1 | 632 | 2021-01-01 |
| 1 | 521 | 2021-01-01 |
| 2 | 7413 | 2021-01-01 |
| ... | ... | ... |
然后我有第二个df
跟踪status
每个人每天的时间id2
,命名为df2
:
| yyyy_mm_dd | id2 | product | status |
|------------|------|---------|--------|
| 2021-01-01 | 3245 | p1 | i |
| 2021-01-01 | 3245 | p2 | f_c |
| 2021-01-01 | 3245 | p3 | n_c |
| 2021-01-01 | 4564 | p1 | n_c |
| 2021-01-01 | 4564 | p2 | n_c |
| 2021-01-01 | 4564 | p3 | n_c |
| 2021-01-01 | 3546 | p1 | f_c |
| 2021-01-01 | 3546 | p2 | n_c |
| 2021-01-01 | 3546 | p3 | n_c |
| 2021-01-01 | 7413 | p1 | f_c |
| ... | ... | ... | .. |
我想创建一个输出数据框,从而id1
继承status
from id2
。我面临的问题是之间存在一对多的关系id1
,id2
因此很难继承状态。
考虑到这一点,我想从状态中获取greatest
/max
值,但这也很困难,因为它们是字符串。虽然,有这样的层次结构 i > f_c > n_c
。
基于上述,我希望我的输出如下所示:
| yyyy_mm_dd | id1 | product | status |
|------------|-----|---------|--------|
| 2020-01-01 | 1 | p1 | i |
| 2020-01-01 | 1 | p2 | f_c |
| 2020-01-01 | 1 | p3 | n_c |
| 2020-01-01 | 2 | p1 | f_c |
对于输出,id1
= 1 继承了i
status
forp1
因为i
是id2
=中的最大状态(3245, 4564, 3546)
。id1
对于= 1 和p2
,也可以看到相同f_c
的结果,status
因为它是id2
=中的最大值(3245, 4564, 3546)
。
我知道我可以像这样加入数据:
df3 = (
df1
.join(df2, on = ['yyyy_mm_dd', 'id2']
)
但我不确定如何status
在id2
s 中取最大,因为它不是数字的。
解决方案
您可以df
使用df2
onid2
和yyyy_mm_dd
columns 连接,然后计算行号和 orderby 一个 when 表达式用于自定义排序status
from pyspark.sql import functions as F, Window
result = df1.join(df2, ["yyyy_mm_dd", "id2"]).withColumn(
"rn",
F.row_number().over(
Window.partitionBy("yyyy_mm_dd", "id1", "product").orderBy(
F.when(F.col("status") == "i", 1).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 3)
)
)
).filter("rn = 1").drop("id2", "rn")
result.show()
#+----------+---+-------+------+
#|yyyy_mm_dd|id1|product|status|
#+----------+---+-------+------+
#|2021-01-01| 1| p2| f_c|
#|2021-01-01| 2| p1| f_c|
#|2021-01-01| 1| p1| i |
#|2021-01-01| 1| p3| n_c|
#+----------+---+-------+------+
或者,如果您更喜欢使用groupBy
with max
:
result = df1.join(df2, ["yyyy_mm_dd", "id2"]).groupBy("yyyy_mm_dd", "id1", "product").agg(
F.max(
F.when(F.col("status") == "i", 3).when(F.col("status") == "f_c", 2).when(F.col("status") == "n_c", 1)
).alias("max_status")
).select(
"yyyy_mm_dd", "id1", "product",
F.when(F.col("max_status") == 3, "i")
.when(F.col("max_status") == 2, "f_c")
.when(F.col("max_status") == 1, "n_c").alias("status")
)
推荐阅读
- windows - %~dp0 在尝试从 .txt 文件接收内容但在其他 .bat 文件中工作时不起作用?
- flutter - Flutter:运行 pub get 时 image_picker 失败
- asp.net-core - 如何将 healthcheck 端点添加到 ApiExplorer,以便 Swashbuck 将其包含在生成的 swagger.json 中
- html - 在android studio typescript的数组中加粗一个特定的关键字
- node.js - 如何在猫鼬中获取特定日期的数据?
- c# - 使用 Windows 窗体处理通知
- node.js - 将 JS 注入到具有匹配文件名的 HTML 页面中
- php - 允许客户在 WooCommerce 中更改订单状态
- node.js - 控制台中的节点打印 [Circular]
- torch - 我找不到使用 torch.autograd.set_detect_anomaly(True) 的就地操作