首页 > 解决方案 > PySpark:根据当前行值计算行数

问题描述

我有一个带有“速度”列的 DataFrame。
我可以有效地为每一行添加一个列,其中包含 DataFrame 中的行数,使得它们的“速度”在“速度”行的 +/2 范围内?

results = spark.createDataFrame([[1],[2],[3],[4],[5],
                                 [4],[5],[4],[5],[6],
                                 [5],[6],[1],[3],[8],
                                 [2],[5],[6],[10],[12]], 
                                 ['Speed'])

results.show()

+-----+
|Speed|
+-----+
|    1|
|    2|
|    3|
|    4|
|    5|
|    4|
|    5|
|    4|
|    5|
|    6|
|    5|
|    6|
|    1|
|    3|
|    8|
|    2|
|    5|
|    6|
|   10|
|   12|
+-----+

标签: pysparkapache-spark-sqlpyspark-sql

解决方案


您可以使用窗口函数:

# Order the window by speed, and look at range [0;+2]
w = Window.orderBy('Speed').rangeBetween(0,2)

# Define a column counting the number of rows containing value Speed+2
results = results.withColumn('count+2',F.count('Speed').over(w)).orderBy('Speed')
results.show()

+-----+-----+
|Speed|count|
+-----+-----+
|    1|    6|
|    1|    6|
|    2|    7|
|    2|    7|
|    3|   10|
|    3|   10|
|    4|   11|
|    4|   11|
|    4|   11|
|    5|    8|
|    5|    8|
|    5|    8|
|    5|    8|
|    5|    8|
|    6|    4|
|    6|    4|
|    6|    4|
|    8|    2|
|   10|    2|
|   12|    1|
+-----+-----+

注意:窗口函数对研究的行本身进行计数。您可以通过在计数列中添加 -1 来纠正此问题

results = results.withColumn('count+2',F.count('Speed').over(w)-1).orderBy('Speed')

推荐阅读