python - Python并发框架中的flatMap等价物
问题描述
我有一段这样的代码:
for x in range(10):
for v in f(x):
print(v)
我想并行化它,所以我可能会这样做
ex = ProcessPollExecutor()
for vs in ex.map(f, range(10)):
for v in vs:
print(v)
但是,f
是一个生成器,所以上面的代码并不能真正起作用。我可以更改f
为返回一个列表,但这个列表太大而无法放入内存。
理想情况下,我想要类似flatMap
pyspark 的东西。但是,直接使用 pysparksc.parallelize(range(10)).flatMap(f).toLocalIterator()
似乎不起作用。至少当初始列表如此短时,我无法让它使用多个处理器。(我已经尝试了为什么这个简单的 Spark 程序没有使用多个内核?没有运气中的所有内容。)
我可能可以使用队列自己滚动一些东西,但我想知道在 Python 并发框架中是否有一种预期的方法来并行化此类代码?
解决方案
我最终使用 PyStreams 编写了我自己的multiprocessing
小型库。
它通过缓冲具有非常高效的flapmap支持,并支持其他类似Spark的功能,如下所示:
>>> sentences = ["a word is a word", "all words are words"]
>>> (Stream(sentences)
... .flatmap(lambda sentence: sentence.split())
... .chunk_by_key(lambda x: hash(x) % 10)
... .reduce_once(lambda chunk: len(set(chunk)))
... .sum())
6