首页 > 解决方案 > 如何检测pyspark的单调减少

问题描述

我正在使用火花DataFrame,我想从特定列中检测任何值,其中值不会单调减少。对于这些值,我想根据排序标准将它们替换为以前的值。

这是一个概念示例,如果我有一列 value [65, 66, 62, 100, 40]。值“100”不遵循单调下降趋势,因此应替换为 62。因此结果列表将为[65, 66, 62, 62, 40].

下面是我为检测必须替换的值而创建的一些代码,但是我不知道如何用前一个值替换该值,也不知道如何null忽略lag.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window

sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)

rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])

window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
    "__monotonic_col",
    (df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)


sdf.show()

此代码产生以下输出:

+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
|  1|   65|           null|
|  2|   66|          false|
|  3|   62|           true|
|  4|  100|          false|
|  5|   40|           true|
+---+-----+---------------+

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


首先,如果我的理解是正确的,不应该将 66 也替换(由 65),因为它不遵循下降趋势吗?

如果这是正确的解释,那么以下应该有效(我添加了一个额外的列以保持整洁,但您可以将所有内容包装到单个列创建语句中):

from pyspark.sql import functions as F

sdf = sdf.withColumn(
    "__monotonic_col_value",
    F.when(
        F.col("__monotonic_col")  | F.col("__monotonic_col").isNull(), df.value)
    .otherwise(
        F.lag(df.value, 1).over(window)
    ),
)

推荐阅读