pyspark - 比较 2 个 pyspark 数据框列并根据它更改另一列的值
问题描述
我有一个问题,我从我编写的图形算法生成了一个数据框。问题是我希望在每次运行图形代码后保持底层组件的值基本相同。
这是生成的示例数据框:
df = spark.createDataFrame(
[
(1, 'A1'),
(1, 'A2'),
(1, 'A3'),
(2, 'B1'),
(2, 'B2'),
(3, 'B3'),
(4, 'C1'),
(4, 'C2'),
(4, 'C3'),
(4, 'C4'),
(5, 'D1'),
],
['old_comp_id', 'db_id']
)
在另一次运行之后,值完全改变了,所以新的运行具有这样的值,
df2 = spark.createDataFrame(
[
(2, 'A1'),
(2, 'A2'),
(2, 'A3'),
(3, 'B1'),
(3, 'B2'),
(3, 'B3'),
(1, 'C1'),
(1, 'C2'),
(1, 'C3'),
(1, 'C4'),
(4, 'D1'),
],
['new_comp_id', 'db_id']
)
所以我需要做的就是比较上面两个数据帧之间的值,并根据关联的数据库id改变组件id的值。
- 如果 database_id 相同,则将组件 id 更新为来自第一个数据帧
- 如果它们不同,则分配一个全新的 comp_id (new_comp_id = max(old_comp_id)+1)
到目前为止,这是我想出的:
old_ids = df.groupBy("old_comp_id").agg(F.collect_set(F.col("db_id")).alias("old_db_id"))
new_ids = df2.groupBy("new_comp_id").agg(F.collect_set(F.col("db_id")).alias("new_db_id"))
joined = new_ids.join(old_ids,old_ids.old_comp_id == new_ids.new_comp_id,"outer")
joined.withColumn("update_comp", F.when( F.col("new_db_id") == F.col("old_db_id"), F.col('old_comp_id')).otherwise(F.max(F.col("old_comp_id")+1))).show()
解决方案
为了在非聚合列中使用聚合函数,您应该使用窗口函数。
首先,您使用 db_id 外连接 DF:
from pyspark.sql.functions import when, col, max
joinedDF = df.join(df2, df["db_id"] == df2["new_db_id"], "outer")
然后,开始构建窗口(您在其中按 db_id 分组,并按 old_comp_id 排序,以便在第一行中具有最高值的 old_comp_id。
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("db_id")\
.orderBy(desc("old_comp_id"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
然后,您使用 windowSpec 构建最大列
from pyspark.sql.functions import max
maxCompId = max(col("old_comp_id")).over(windowSpec)
然后,将其应用于选择
joinedDF.select(col("db_id"), when(col("new_db_id").isNotNull(), col("old_comp_id")).otherwise(maxCompId+1).alias("updated_comp")).show()
有关更多信息,请参阅文档(http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Window)
希望这可以帮助
推荐阅读
- javascript - JavaScript - 可以在变量声明期间定义先前声明的对象的属性吗?
- flutter - 如何在 Flutter DataColumn 小部件中将标签居中?
- json - 如何使用 circe 将丢失的 json 数组解码为空列表
- apache - 重定向循环,遥远的那么近
- javascript - 为什么ajax请求没有执行?
- python - Python 使用 importlib 从包目录中导入模块
- python - 仅使用 bash 将 IP 地址或文件发送到病毒总数
- javascript - 将 Google Maps VueJS 组件 (vue2-google-maps) 集成到 Laravel 刀片视图中
- python - NPM 重建阻止程序
- intellij-idea - IntelliJ IDEA Scratch 中的“找不到符号”错误