pyspark - 比较两个不同的 pyspark 数据帧
问题描述
我目前正在使用需要使用 pyspark 的 API 环境。这样,我需要在两个数据帧之间执行每日比较,以确定记录是新的、更新的和删除的。
这是两个数据框的示例:
today = spark.createDataFrame([
[1, "Apple", 5000, "A"],
[2, "Banana", 4000, "A"],
[3, "Orange", 3000, "B"],
[4, "Grape", 4500, "C"],
[5, "Watermelon", 2000, "A"]
], ["item_id", "name", "cost", "classification"])
yesterday = spark.createDataFrame([
[1, "Apple", 5000, "B"],
[2, "Bananas", 4000, "A"],
[3, "Tangerine", 3000, "B"],
[4, "Grape", 4500, "C"]
], ["item_id", "name", "cost", "classification"])
我想比较两个数据框并确定什么是新的,什么是更新的。对于新项目,我很容易:
today.join(yesterday, on="item_id", how="left_anti").show()
# +---------+------------+------+----------------+
# | item_id | name | cost | classification |
# +---------+------------+------+----------------+
# | 5 | Watermelon | 2000 | A |
# +---------+------------+------+----------------+
但是对于更新的项目,我不知道如何比较这些结果。我需要为数据框的其余列获取具有不同值的所有行。在上述情况下,我的预期结果是:
# +---------+------------+------+----------------+
# | item_id | name | cost | classification |
# +---------+------------+------+----------------+
# | 1 | Apple | 5000 | A |
# | 2 | Banana | 4000 | A |
# | 3 | Orange | 3000 | B |
# +---------+------------+------+----------------+
解决方案
使用.subtract()
方法获取today
不存在的行yesterday
,然后左半连接yesterday
today.subtract(yesterday).join(yesterday, on="item_id", how="left_semi").show()
# +-------+------+----+--------------+
# |item_id| name|cost|classification|
# +-------+------+----+--------------+
# | 1| Apple|5000| A|
# | 3|Orange|3000| B|
# | 2|Banana|4000| A|
# +-------+------+----+--------------+
推荐阅读
- android - 带有可编辑部分的 TextView
- python - CherryPy 如何在获取表单中指定多个“动作”?
- python - TypeError:“lxml.etree._ElementTree”类型的对象没有 len()
- amazon-web-services - AWS 账单显示没有任何主题的 SNS 使用情况?
- maven - 本地系统中需要父子maven结构
- node.js - Nodejs和Typescript找不到模块或进程
- c - 权限被 open() 拒绝
- python - 将forloop给出的字典输出存储在python中
- python - 在 PyQt5 GUI 应用程序中分离关注点。需要单独的 Qthread 进行并发
- python - 从python中的api中过滤特定数据?