首页 > 解决方案 > spark中的迭代过滤器似乎不起作用

问题描述

我正在尝试逐个删除 RDD 的元素,但这不起作用,因为元素重新出现。

这是我的代码的一部分:

rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd=rdd.filter(lambda x:x!=i)
print(rdd.collect())
[0, 1, 2, 3]

所以似乎只有最后一个过滤器是“记住”。我在想,在这个循环之后,rdd 会是空的。

但是,我不明白为什么,因为每次我将通过过滤器获得的新 rdd 保存在“rdd”中,所以它不应该保留所有的转换吗?如果没有,我该怎么办?

感谢您指出我错在哪里!

标签: pythonapache-sparkpysparkrdd

解决方案


结果实际上是正确的 - 这不是 Spark 的错误。请注意,lambda 函数被定义为x != i,并且i不会被替换为 lambda 函数。所以在 for 循环的每次迭代中,RDD 看起来像

rdd
rdd.filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i)
rdd.filter(lambda x: x != i).filter(lambda x: x != i).filter(lambda x: x != i)

等等

由于过滤器都是相同的,并且它们将被替换为 的最新值i,因此在每次 for 循环迭代中只过滤掉一项。

为避免这种情况,您可以使用部分函数来确保i将其替换到函数中:

from functools import partial
 
rdd = spark.sparkContext.parallelize([0,1,2,3,4])
for i in range(5):
    rdd = rdd.filter(partial(lambda x, i: x != i, i))

print(rdd.collect())

或者您可以使用reduce

from functools import reduce

rdd = spark.sparkContext.parallelize([0,1,2])
rdd = reduce(lambda r, i: r.filter(lambda x: x != i), range(3), rdd)
print(rdd.collect())

推荐阅读