首页 > 解决方案 > PySpark 窗口功能改进

问题描述

我需要用以前的记录值替换,所以我已经使用窗口函数实现了这个,但我想提高性能。您能否告知是否有其他替代方法。

from pyspark.sql import SparkSession, Window, DataFrame
from pyspark.sql.types import *
from pyspark.sql import functions as F

source = [(1,2,3),(2,3,4),(1,3,4)]
target = [(1,3,1),(3,4,1)]
schema = ['key','col1','col2']
source_df = spark.createDataFrame(source, schema=schema)
target_df = spark.createDataFrame(source, schema=schema)

df = source_df.unionAll(target_df)

window = Window.partitionBy(F.col('key')).orderBy(F.col('col2').asc())


df = df.withColumn('col1_prev', F.lag(F.col('col1_start')).over(window)\
       .withColumn('col1', F.lit('col1_next'))

df.show()

1,3,1
1,2,1
1,3,3
2,3,4
3,4,1

标签: pythonpysparkhivewindow-functions

解决方案


您可以last在指定的时间间隔内使用该函数,例如窗口中的最后 2 行。我将其设置为maxsize此处作为示例:

import sys
window = Window.partitionBy('key')\
               .orderBy('col2')\
               .rowsBetween(-sys.maxsize, -1)

df = F.last(df['col1_prev'], ignorenulls=True).over(window)

我希望它能解决你的问题。


推荐阅读