python - 如何检测pyspark的单调减少
问题描述
我正在使用火花DataFrame
,我想从特定列中检测任何值,其中值不会单调减少。对于这些值,我想根据排序标准将它们替换为以前的值。
这是一个概念示例,如果我有一列 value [65, 66, 62, 100, 40]
。值“100”不遵循单调下降趋势,因此应替换为 62。因此结果列表将为[65, 66, 62, 62, 40]
.
下面是我为检测必须替换的值而创建的一些代码,但是我不知道如何用前一个值替换该值,也不知道如何null
忽略lag
.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import functions as psf
from pyspark.sql.window import Window
sc = SparkContext(appName="sample-app")
sqlc = SQLContext(sc)
rdd = sc.parallelize([(1, 65), (2, 66), (3, 62), (4, 100), (5, 40)])
df = sqlc.createDataFrame(rdd, ["id", "value"])
window = Window.orderBy(df.id).rowsBetween(-1, -1)
sdf = df.withColumn(
"__monotonic_col",
(df.value <= psf.lag(df.value, 1).over(window)) & df.value.isNotNull(),
)
sdf.show()
此代码产生以下输出:
+---+-----+---------------+
| id|value|__monotonic_col|
+---+-----+---------------+
| 1| 65| null|
| 2| 66| false|
| 3| 62| true|
| 4| 100| false|
| 5| 40| true|
+---+-----+---------------+
解决方案
首先,如果我的理解是正确的,不应该将 66 也替换(由 65),因为它不遵循下降趋势吗?
如果这是正确的解释,那么以下应该有效(我添加了一个额外的列以保持整洁,但您可以将所有内容包装到单个列创建语句中):
from pyspark.sql import functions as F
sdf = sdf.withColumn(
"__monotonic_col_value",
F.when(
F.col("__monotonic_col") | F.col("__monotonic_col").isNull(), df.value)
.otherwise(
F.lag(df.value, 1).over(window)
),
)
推荐阅读
- elasticsearch - 将Kafka主题标题显示为Kibana中的字段,logstash add_field?
- django - 多对多关系的 Django 动态表单
- c# - 对所有可能的指标值、排列进行策略优化
- jquery - 在 Django 应用程序中单击时刷新显示的数据
- azure - 如何在 Azure 中检查向 IoT 中心发送消息的来源?
- c++ - 为什么虚拟 cudaMalloc 在moderngpu 中加速interval_gather?
- sql - JPA 查询参数 IN 或 IS NULL
- firebase - Firebase 实时数据库过滤数据
- python - PYTHON CSV 文件上传 - WebUI -> Lambda ->S3
- python - 在“字符串”类型的 pd.dataframe 上使用 iloc 时,字符串比较总是错误的