python - 带有管理器和异步方法的 multiprocessing.pool
问题描述
我正在尝试使用 Manager() 在进程之间共享字典并尝试了以下代码:
from multiprocessing import Manager, Pool
def f(d):
d['x'] += 2
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
d['x'] = 2
p= Pool(4)
for _ in range(2000):
p.map_async(f, (d,)) #apply_async, map
p.close()
p.join()
print (d) # expects this result --> {'x': 4002}
使用 map_async 和 apply_async,打印的结果总是不同的(例如 {'x': 3838}, {'x': 3770})。但是,使用 map 会给出预期的结果。另外,我尝试使用 Process 而不是 Pool,结果也不同。
有什么见解吗?非阻塞部分和竞态条件不是由经理处理的吗?
解决方案
当您调用map
(而不是map_async
)时,它将阻塞,直到处理器完成您传递的所有请求,在您的情况下,这只是对 function 的一次调用f
。因此,即使您的池大小为 4,您实际上也是一次处理 2000 个进程。要真正并行执行,您应该执行一次p.map(f, [d]*2000)
而不是循环。
但是当您调用 时map_async
,您不会阻塞并返回一个结果对象。get
对结果对象的调用将阻塞,直到进程完成,并将返回函数调用的结果。因此,现在您一次最多运行 4 个进程。但是字典的更新不会跨处理器序列化。我修改了代码以d[x] += 2
通过使用多处理锁来强制序列化。您将看到结果现在是 4002。
from multiprocessing import Manager, Pool, Lock
def f(d):
lock.acquire()
d['x'] += 2
lock.release()
def init(l):
global lock
lock = l
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
d['x'] = 2
lock = Lock()
p = Pool(4, initializer=init, initargs=(lock,)) # Create the multiprocessing lock that is sharable by all the processes
results = [] # if the function returnd a result we wanted
for _ in range(2000):
results.append(p.map_async(f, (d,))) #apply_async, map
"""
for i in range(2000): # if the function returned a result we wanted
results[i].get() # wait for everything to finish
"""
p.close()
p.join()
print(d)
推荐阅读
- android - 为什么在kotlin中使用“newInstance()”而不是关键字“new”来创建xmlPullParserFactory?
- react-native - React Native Debugger 阻止网络请求
- vue.js - 没有 v-model 的 Vue.JS 无线电输入
- css - 无法使用带有样式组件的关键帧设置嵌套动画的样式
- c# - 图表绑定与数据库问题导致日期格式
- arrays - Swift - 在数组中搜索是否包含字符串并附加到另一个数组
- objective-c - 迭代 IOService 对象时,迭代器是否进入子节点?
- c++ - 如何将基于范围的 for 循环与未来向量一起使用
- c# - Xamarin.Forms 中的自定义嵌套 ListView
- c - 为什么我们不能将点用于新创建的指向结构的指针