首页 > 解决方案 > Python并发框架中的flatMap等价物

问题描述

我有一段这样的代码:

for x in range(10):
    for v in f(x):
        print(v)

我想并行化它,所以我可能会这样做

ex = ProcessPollExecutor()
for vs in ex.map(f, range(10)):
    for v in vs:
        print(v)

但是,f是一个生成器,所以上面的代码并不能真正起作用。我可以更改f为返回一个列表,但这个列表太大而无法放入内存。

理想情况下,我想要类似flatMappyspark 的东西。但是,直接使用 pysparksc.parallelize(range(10)).flatMap(f).toLocalIterator() 似乎不起作用。至少当初始列表如此短时,我无法让它使用多个处理器。(我已经尝试了为什么这个简单的 Spark 程序没有使用多个内核?没有运气中的所有内容。)

我可能可以使用队列自己滚动一些东西,但我想知道在 Python 并发框架中是否有一种预期的方法来并行化此类代码?

标签: pythonparallel-processing

解决方案


我最终使用 PyStreams 编写了我自己的multiprocessing小型

它通过缓冲具有非常高效的flapmap支持,并支持其他类似Spark的功能,如下所示:

>>> sentences = ["a word is a word", "all words are words"]
>>> (Stream(sentences)
...           .flatmap(lambda sentence: sentence.split())
...           .chunk_by_key(lambda x: hash(x) % 10)
...           .reduce_once(lambda chunk: len(set(chunk)))
...           .sum())
6

推荐阅读