apache-spark - 如何在pySpark中有条件地替换值并将替换后的值用于下一个条件
问题描述
首先,希望我正确格式化我的问题。
我有这个数据框:
df = sc.parallelize([
('1112', 1, 0, 1, '2018-05-01'),
('1111', 1, 1, 1, '2018-05-01'),
('1111', 1, 3, 2, '2018-05-04'),
('1111', 1, 1, 2, '2018-05-05'),
('1111', 1, 1, 2, '2018-05-06'),
]).toDF(["customer_id", "buy_count", "date_difference", "expected_answer", "date"]).cache()
df.show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer| date|
+-----------+---------+---------------+---------------+----------+
| 1111| 1| 1| 1|2018-05-01|
| 1111| 1| 3| 2|2018-05-04|
| 1111| 1| 1| 2|2018-05-05|
| 1111| 1| 1| 2|2018-05-06|
| 1112| 1| 0| 1|2018-05-01|
+-----------+---------+---------------+---------------+----------+
我想创建“expected_answer”列:
如果客户超过 3 天没有购买(date_difference >=3),我想将他的 buy_count 增加 1。之后的每次购买都需要有新的 buy_count,除非他再 3 天不购买case buy_count 将再次增加。
这是我的代码以及我使用它的程度。问题似乎是 spark 实际上并没有估算值,而是创建了一个新列。有没有办法克服这个问题?我也尝试过使用 Hive,结果完全相同。
from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import when
windowSpec = func.lag(df['buy_count']).\
over(Window.partitionBy(df['customer_id']).\
orderBy(df['date'].asc()))
df.withColumn('buy_count', \
when(df['date_difference'] >=3, windowSpec +1).when(windowSpec.isNull(), 1)\
.otherwise(windowSpec)).show()
+-----------+---------+---------------+---------------+----------+
|customer_id|buy_count|date_difference|expected_answer| date|
+-----------+---------+---------------+---------------+----------+
| 1112| 1| 0| 1|2018-05-01|
| 1111| 1| 1| 1|2018-05-01|
| 1111| 2| 3| 2|2018-05-04|
| 1111| 1| 1| 2|2018-05-05|
| 1111| 1| 1| 2|2018-05-06|
+-----------+---------+---------------+---------------+----------+
我怎样才能得到预期的结果?提前致谢。
解决方案
终于想通了。感谢大家指出类似的案例。
我的印象是 SUM() over Partition 将对整个分区求和,而不仅仅是对当前行之前的所有内容求和。幸运的是,我能够用一个非常简单的 SQL 解决我的问题:
SELECT SUM(CASE WHEN(date_difference>=3) THEN 1 ELSE 0 END) OVER (PARTITION BY customer_id ORDER BY date)
FROM df
sqlContext.sql(qry).show()
推荐阅读
- c# - 为什么在根据 .NET Standard 编译 .NET Framework 项目时缺少此 NuGet 依赖项?
- angular - 未捕获的参考错误:模块在工作应用程序时未定义角度 7
- google-apps-script - 浏览数据并用空格替换“X”
- php - 在文本中查找主题标签
- c# - 为什么我可以在 c#7.3 中将 ref struct 声明为类的成员?
- java - 如何在 Spring Boot 中从 application.properties 文件中进行可配置的 Retryable maxAttempts 和退避
- javascript - 如何防止空白页对 http 错误做出反应
- kubernetes - Calico 的 Kubernetes DNS 和 NetworkPolicy 不起作用
- c++ - 如何获取正确的线程 ID 和值
- python - 如何更改它以使用 aq 表进行强化学习