python - 在 UDF Pyspark 中更新变量值
问题描述
我想要一个通过“值”列的 udf 函数,并检查下一个值是否为当前行值的 50% 或更多。如果它在 50% 以内,那么我想包含值“是”,如果不是,那么我不想包含该值。如果值在最后一个值和下一个值之间下降得太快,则不应包含在内,但如果它逐渐下降且与上一个包含值相比不超过 50%,则没关系。这就是为什么不包括 id 5 的 .1 而包括 id 9 的 0.1 的原因,因为它遵循一个从 0.4 逐渐下降不超过 50% 的值。我正在考虑在 udf 中使用一个变量来跟踪最后一个可接受的值,但我不确定如何去做。
rows = sc.parallelize([[1, .9, 'yes'], [2, .7, 'yes'], [3, .4, 'yes'], [4, .15, 'no'], [5, .1, 'no'], [7, .3, 'yes'], [8, .2, 'yes'], [9, .1, 'yes']])
rows_df = rows.toDF(["ID", 'Values', 'Include'])
#preview data
rows_df.show()
#show data schema
rows_df.printSchema()
+---+------+-------+
| ID|Values|Include|
+---+------+-------+
| 1| 0.9| yes|
| 2| 0.7| yes|
| 3| 0.4| yes|
| 4| 0.15| no|
| 5| 0.1| no|
| 7| 0.3| yes|
| 8| 0.2| yes|
| 9| 0.1| yes|
+---+------+-------+
解决方案
为了实现您的目标,您不必使用 UDF(事实上我认为这不可能),您可以使用在窗口上工作的各种功能,例如lag
.
我不得不承认我并不完全理解您的要求(为什么 5. 应该是“不”?),但是在 , 之间,lag
您应该能够实现它。您可以在docs中阅读有关它们的更多信息。基于先前值执行逻辑的示例:lead
last
from pyspark.sql import Window
from pyspark.sql.functions import col, lag, when, lit
windowSpec = Window.orderBy("Id")
withPrevious = rows_df.withColumn("prevVal", lag(rows_df["Values"]).over(windowSpec))
withPrevious.withColumn("Include2",
when(col("prevVal").isNull(), "yes")\
.when(col("Values") >= 0.5 * col("prevVal"), lit("yes"))\
.otherwise("no"))\
.show()
+---+------+-------+-------+--------+
| ID|Values|Include|prevVal|Include2|
+---+------+-------+-------+--------+
| 1| 0.9| yes| null| yes|
| 2| 0.7| yes| 0.9| yes|
| 3| 0.4| yes| 0.7| yes|
| 4| 0.15| no| 0.4| no|
| 5| 0.1| no| 0.15| yes|
| 7| 0.3| yes| 0.1| yes|
| 8| 0.2| yes| 0.3| yes|
| 9| 0.1| yes| 0.2| yes|
+---+------+-------+-------+--------+
推荐阅读
- python - 错误是什么:“ValueError:赋值目标是只读的”?
- c# - Photon Server:是不是所有的ClientPeer实例都自动运行在光纤中?
- python - 在 python 中读取和处理大型 wav 文件的最佳方法是什么
- python - 套接字模块(python)工作但不使用指定的端口号?
- amazon-ec2 - 访问在 AWS 中运行的 Jupyter - 访问它时出现证书错误
- shell - 使用 sed 替换 shell 脚本中的值
- git - Git说master是最新的,但不是
- android - 改造 - 基本网址必须以 / 结尾
- heroku - 电子邮件验证失败
- spring - 非 Spring Boot 应用程序,Spring 数据 jpa 保存不起作用