sql - 如何将此 SQL 查询转换为 PySpark 代码?
问题描述
我有这张桌子
df = spark.createDataFrame(
[
(1, 12345, "a@gmail.com", "2020-01-01"),
(1, 12345, "a@gmail.com", "2020-01-02"),
(1, 23456, "a@gmail.com", "2020-01-03"),
(1, 34567, "a@gmail.com", "2020-01-04"),
(1, 12345, "a@gmail.com", "2020-01-05"),
(1, 45678, "a@gmail.com", "2020-01-06"),
(1, 45678, "a@gmail.com", "2020-01-07"),
(2, 56789, "b@gmail.com", "2020-01-01"),
(2, 56789, "b@gmail.com", "2020-01-02"),
(2, 56789, "c@gmail.com", "2020-01-03"),
(2, 67890, "c@gmail.com", "2020-01-04"),
(2, 67890, "c@gmail.com", "2020-01-05"),
(3, 78901, "d@gmail.com", "2020-01-01"),
(3, 78901, "d@gmail.com", "2020-01-02"),
(3, 78901, "d@gmail.com", "2020-01-03"),
],
["id", "phone_number", "email", "date"],
)
我想从中选择所有行,这些行要么是每个 ID 的第一个日期,要么是电话号码或电子邮件地址自上一个日期以来已更改。
我通过创建一个临时视图然后对其执行原始 SQL 查询来实现这一点,如下所示:
df.createOrReplaceTempView("df")
df = spark.sql(
"""
SELECT a.*
FROM (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS a
LEFT JOIN (SELECT ROW_NUMBER() OVER (PARTITION BY id ORDER BY date) AS row, id, phone_number, email, date FROM df) AS b
ON a.row = b.row + 1 AND a.id = b.id
WHERE a.phone_number != b.phone_number OR b.phone_number IS NULL OR a.email != b.email OR b.email IS NULL
"""
)
但是,我更愿意使用纯 PySpark 函数来实现相同的结果。如何将此 SQL 查询转换为 PySpark?
这是我到目前为止所尝试的:
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
a = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
b = df.withColumn("row", F.row_number().over(W.partitionBy("id").orderBy("date")))
df = a.join(b, on=[a.row == b.row + 1, a.id == b.id], how="left").where(
(a.phone_number != b.phone_number)
| (b.phone_number.isNull())
| (a.email != b.email)
| (b.email.isNull())
)
解决方案
我会做的有点不同。不遵循您的 SQL,而是直接应用您的业务规则:
w = Window.partitionBy("id").orderBy("date")
df.withColumn(
"rnk", F.row_number().over(w)
).withColumn(
"old", F.lag(F.struct([F.col("phone_number"), F.col("email")])).over(w)
).where(
(F.col("rnk") == 1)
| (F.col("phone_number") != F.col("old.phone_number"))
| (F.col("email") != F.col("old.email"))
).show()
+---+------------+-----------+----------+---+--------------------+
| id|phone_number| email| date|rnk| old|
+---+------------+-----------+----------+---+--------------------+
| 1| 12345|a@gmail.com|2020-01-01| 1| null|
| 1| 23456|a@gmail.com|2020-01-03| 3|[12345, a@gmail.com]|
| 1| 34567|a@gmail.com|2020-01-04| 4|[23456, a@gmail.com]|
| 1| 12345|a@gmail.com|2020-01-05| 5|[34567, a@gmail.com]|
| 1| 45678|a@gmail.com|2020-01-06| 6|[12345, a@gmail.com]|
| 3| 78901|d@gmail.com|2020-01-01| 1| null|
| 2| 56789|b@gmail.com|2020-01-01| 1| null|
| 2| 56789|c@gmail.com|2020-01-03| 3|[56789, b@gmail.com]|
| 2| 67890|c@gmail.com|2020-01-04| 4|[56789, c@gmail.com]|
+---+------------+-----------+----------+---+--------------------+
注意:您可以用测试替换 rnk 上的测试F.col("old").isNull()
(因此,您不必计算 rnk)
推荐阅读
- postgresql - postgres如何获得所需的结果
- kubernetes - k8s gcp教程上的端口转发失败
- c - 在 cr0 上设置标志后,启用分页会立即导致 PAGE_FAULT
- ios - 在展开可选值时意外发现 nil。avcapturedevice输入
- javascript - 无法滚动和双击元素
- javascript - 如何只显示一个面板?
- android-studio - Visual Studio 表示,尽管将目录列入白名单,但诺顿仍在降低构建性能
- docker - Docker 容器内 Nginx 设置的 AWS/SSL 证书
- c# - 使用 JsonConvert.DeserializeObject 在 c# 中反序列化多维 json 对象时出现问题
- python - 使用 python 重新运行 maxent