python - PySpark 窗口功能改进
问题描述
我需要用以前的记录值替换,所以我已经使用窗口函数实现了这个,但我想提高性能。您能否告知是否有其他替代方法。
from pyspark.sql import SparkSession, Window, DataFrame
from pyspark.sql.types import *
from pyspark.sql import functions as F
source = [(1,2,3),(2,3,4),(1,3,4)]
target = [(1,3,1),(3,4,1)]
schema = ['key','col1','col2']
source_df = spark.createDataFrame(source, schema=schema)
target_df = spark.createDataFrame(source, schema=schema)
df = source_df.unionAll(target_df)
window = Window.partitionBy(F.col('key')).orderBy(F.col('col2').asc())
df = df.withColumn('col1_prev', F.lag(F.col('col1_start')).over(window)\
.withColumn('col1', F.lit('col1_next'))
df.show()
1,3,1
1,2,1
1,3,3
2,3,4
3,4,1
解决方案
您可以last
在指定的时间间隔内使用该函数,例如窗口中的最后 2 行。我将其设置为maxsize
此处作为示例:
import sys
window = Window.partitionBy('key')\
.orderBy('col2')\
.rowsBetween(-sys.maxsize, -1)
df = F.last(df['col1_prev'], ignorenulls=True).over(window)
我希望它能解决你的问题。
推荐阅读
- tensorflow - 是否可以将 2 个显卡的内存加在一起来运行更大的神经网络?
- opengl - glOrtho 的实际公式是什么
- fonts - Godot - 按需制作标签,并使用 GDscript 设置其字体大小
- powershell - 将值附加到 AD 组对象
- sql - 表被锁定时如何自动停止 PostgreSQL 查询?
- .net - Net Core 中“ProducesResponseType”的 .NET 等价物是什么?
- git - git docs - “上游”总是意味着“服务器端分支”吗?
- java - 处理多线程时如何避免瓶颈?
- javascript - 从函数返回的 API 响应返回未定义
- ios - 调用 Swift 协议初始化器