python - 如何使用分组数据的后续行的值来决定使用pyspark的当前行的值
问题描述
在下面的数据集中,我想将won_offer
列的值更改为 a1
或 a 0
。问题是我需要客户代码组合的后续行来决定该列的值。
如果当前行日期的 30 天内的下一行之一包含order
并且价格低于当前行的价格,则该行列的 0won_offer
可以变为 1。
样本数据集:
analysis = sqlContext.createDataFrame(
[
('customer1', 'code1', 'date', 'order', 1.7, 0, 1),
('customer1', 'code2', 'date', 'offer', 1.5, 0, 2),
('customer1', 'code2', 'date', 'offer', 2.0, 0, 2),
('customer2', 'code1', 'date', 'offer', 1.2, 0,4),
('customer2', 'code1', 'date', 'order', 1.1, 0,4),
('customer2', 'code1', 'date', 'order', 2.0, 0,4),
('customer2', 'code1', 'date', 'offer', 1.2, 0,4)
],
('customer', 'code', 'order_date', 'type', 'price', 'final_offer', 'counter')
)
我尝试了这样的方法,但它不起作用,因为我不知道如何将多行传递给我的 udf:
w = \
Window.partitionBy('customer','code').orderBy('orderoffer_date')
@F.udf(returnType=IntegerType())
def logic_udf(counter, curr_date, next_dates, current_type, next_types, curr_price, next_prices) :
for i in range(len(counter)):
if (next_dates[i] < curr_date+30):
if (next_types[i] == 'order') & (next_prices[i] < curr_price ):
return 1
else:
return 0
else:
return 0
analysis = analysis.withColumn('won_offer',
logic(analysis.counter, analysis.order_date,lead(analysis.order_date,
analysis.n).over(w), analysis.type,lead(analysis.type,
analysis.n).over(w), analysis.price, lead(analysis.price,
analysis.n).over(w)))
所需的输出:
desired_result = sqlCtx.createDataFrame(
[
('customer1', 'code1', 'date', 'order', 1.7, 0, 1),
('customer1', 'code2', 'date', 'offer', 1.5, 0, 2),
('customer1', 'code2', 'date', 'offer', 2.0, 0, 2),
('customer2', 'code1', 'date', 'offer', 1.2, 1, 4),
('customer2', 'code1', 'date', 'order', 1.1, 1, 4),
('customer2', 'code1', 'date', 'order', 1.0, 0, 4),
('customer2', 'code1', 'date', 'offer', 1.2, 0, 4)
],
('customer', 'code', 'order_date', 'type', 'price', 'final_offer', 'counter')
)
我意识到我的问题很复杂。如果有人能告诉我如何将多行分组数据传递给 udf,我已经得到了很大帮助。
简而言之:主要目标是通过查看下一行中的多列(并且仍在其特定组中)来确定一行中列的值。
提前致谢!查尔斯
解决方案
您可以使用带有 sql 函数的 window 来替换 logic_udf。由于您仅在当前行之后使用第一行,因此您可以将当前行之后的第一行添加到当前行。
from pyspark.sql import functions as F
analysis \
.withColumn('next_order_date', F.first('order_date').over(w)) \
.withColumn('next_type', F.first('type').over(w)) \
.withColumn('next_price', F.first('price').over(w)) \
.withColumn('won_offer', F.when(condition, 1).otherwise(0))
推荐阅读
- owl - 如何从 RDF 图包含一个语句
- tensorflow - " AttributeError: 'str' 对象没有属性 'decode' "jupyter notebook 上的 maskrcnn 实现
- go - 在 Golang 中编辑 ZIP 存档
- perl - Perl脚本处理输入文件并实现期望输出
- reactjs - 编码风格:如何区分来自 parent 或 redux 的 props?
- python - Discord.py:如何获取触发 on_member_update 的用户名?
- java - Spring Boot 和 Hibernate 有没有办法使用 postgres 批量插入具有 UUID 主键的实体?
- c# - 我无法将我的答案输出为多个数字,它表示指定的演员表无效
- python - 如何在石墨烯django中上传文件
- r - 卡住:使用 mipfp 的二维 8 x 17 表的 IFP