python - spark中的迭代过滤器似乎不起作用
问题描述
我正在尝试逐个删除 RDD 的元素,但这不起作用,因为元素重新出现。
这是我的代码的一部分:
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0, 1, 2, 3]
所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。
但是,我不明白为什么,因为每次我将通过过滤器获得的新 rdd 保存在“rdd”中,所以它不应该保留所有的转换吗?如果没有,我该怎么办?
感谢您指出我错在哪里!
解决方案
结果实际上是正确的 - 这不是 Spark 的错误。请注意,lambda 函数被定义为x != i
,并且i
不会被替换为 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像
rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)
等等
由于过滤器都是相同的,并且它们将被替换为 的最新值i
,因此在每次 for 循环迭代中只过滤掉一项。
为避免这种情况,您可以使用部分函数来确保i
将其替换到函数中:
from functools import partial
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
rdd = rdd.filter(partial(lambda x, i: x != i, i))
print(rdd.collect())
或者您可以使用reduce
:
from functools import reduce
rdd = spark.sparkContext.parallelize([0,1,2])
rdd = reduce(lambda r, i: r.filter(lambda x: x != i), range(3), rdd)
print(rdd.collect())
推荐阅读
- javascript - 数据类型 .innerHTML
- python - 我无法启动我的第一个 python nameko 服务
- reactjs - 如何使用 Gatsby 创建应用程序外壳?
- python - 为什么这种将字符串处理成单词的字典查找方法比 .split() 慢?
- gmail - 如何删除使用 sendgrid 发送的 gmail 中的“查看整个邮件”选项
- three.js - BufferGeometry 的 Uint16Array UV 属性如何映射到纹理?
- mysql - 如何使用总和处理 SQL 子查询
- reactjs - 如何在反应js中忽略证书验证
- amazon-cloudformation - 关于参数存储和云形成集成的问题
- sql-server - SQL 语法 SELECT 和 Sub SELECT 和 JOIN