首页 > 解决方案 > 比较两个不同的 pyspark 数据帧

问题描述

我目前正在使用需要使用 pyspark 的 API 环境。这样,我需要在两个数据帧之间执行每日比较,以确定记录是新的、更新的和删除的。

这是两个数据框的示例:

today = spark.createDataFrame([
  [1, "Apple", 5000, "A"],
  [2, "Banana", 4000, "A"],
  [3, "Orange", 3000, "B"],
  [4, "Grape", 4500, "C"],
  [5, "Watermelon", 2000, "A"]
], ["item_id", "name", "cost", "classification"])

yesterday = spark.createDataFrame([
  [1, "Apple", 5000, "B"],
  [2, "Bananas", 4000, "A"],
  [3, "Tangerine", 3000, "B"],
  [4, "Grape", 4500, "C"]
], ["item_id", "name", "cost", "classification"])

我想比较两个数据框并确定什么是新的,什么是更新的。对于新项目,我很容易:

today.join(yesterday, on="item_id", how="left_anti").show()

# +---------+------------+------+----------------+
# | item_id |    name    | cost | classification |
# +---------+------------+------+----------------+
# |    5    | Watermelon | 2000 |       A        |
# +---------+------------+------+----------------+

但是对于更新的项目,我不知道如何比较这些结果。我需要为数据框的其余列获取具有不同值的所有行。在上述情况下,我的预期结果是:

# +---------+------------+------+----------------+
# | item_id |    name    | cost | classification |
# +---------+------------+------+----------------+
# |    1    |    Apple   | 5000 |       A        |
# |    2    |   Banana   | 4000 |       A        |
# |    3    |   Orange   | 3000 |       B        |
# +---------+------------+------+----------------+

标签: pysparkapache-spark-sqlcompare

解决方案


使用.subtract()方法获取today不存在的行yesterday,然后左半连接yesterday

today.subtract(yesterday).join(yesterday, on="item_id", how="left_semi").show()

# +-------+------+----+--------------+
# |item_id|  name|cost|classification|
# +-------+------+----+--------------+
# |      1| Apple|5000|             A|
# |      3|Orange|3000|             B|
# |      2|Banana|4000|             A|
# +-------+------+----+--------------+

推荐阅读