python - 多进程生成器方法仅对每个进程工作一次
问题描述
我有一个看起来像这样的生成器:
class GeneratorClass():
def __init__(self, objClient):
self.clienteGen = objClient
def generatorDataClient(self):
amount = 0
while True:
amount += random.randint(0, 2000)
yield amount
sleep = random.choice([1,2,3])
print("sleep " + str(sleep))
time.sleep(sleep)
然后我遍历它,它有效:current_mean()
每次生成新数据时它都会执行该方法。
def iterate_clients(pos):
genobject4 = GeneratorClass(client_list[pos])
generator4 = genobject4.generatorDataClient()
current_client = genobject1.default_client
account1 = current_client.account
cnt = 0
acc_mean = 0
for item in generator4:
#We call a function previously defined
acc_mean, cnt = account1.current_mean(acc_mean, item, cnt)
print("media : " + str(acc_mean), str(cnt))
# iterate_clients(2)
它工作,你给它一个有效的客户端,它开始执行生成操作,这是一个移动平均线,并且由于它是用 a 定义的,While: True
它不会停止。
现在我想并行化这个并且我设法让它工作,但只有一次:
names = ["James", "Anna"]
client_list = [Cliente(name) for name in names]
array_length = len(client_list)
import multiprocessing
if __name__ == '__main__':
for i in range(array_length):
p = multiprocessing.Process(target=iterate_clients, args=(i,))
p.start()
但相反,每个进程开始,只重复一次,然后停止。结果如下:
calling object with ID: 140199258213624
calling the generator
moving average : 4622.0 1
calling object with ID: 140199258211160
sleep 2
calling the generator
moving average : 8013.0 1
sleep 1
我确信代码可以改进,但我可能错过了一些关于如何并行化这个问题的信息吗?
编辑:
多亏了这个答案,我尝试将循环从更改for i in range(array_length):
为while True:
我得到了一些新的东西:
calling object 140199258211160
calling the generator
calling object 140199258211160
moving average : 7993.0 1
calling the generator
duerme 3
calling object 140199258211160
calling the generator
calling object 140199258211160
moving average : 8000.0 1
moving average : 7869.0 1
duerme 3
calling the generator
它永远不会停止。所以从这里我知道我犯了一个巨大的错误,因为只创建了一个进程,并且它似乎具有竞争条件,因为移动平均线来回移动并且它只在正常过程中上升。
解决方案
这里的问题可能是子进程在主进程完成后终止,因此它们永远没有机会完全运行。
使用.join()
将使主进程等待子进程。
...
import multiprocessing
procs = []
if __name__ == '__main__':
for i in range(array_length):
p = multiprocessing.Process(target=iterate_clients, args=(i,))
p.start()
procs.append(p) # Hold a reference to each child process
for proc in procs:
proc.join() # Wait for each child process
推荐阅读
- spring - Spring,JPA,Hibernate,按预期数量递增/递减变量
- python - 如何处理python中的巨型列表/矩阵?(不可分块)
- reactjs - 什么是 Apollo 中的片段匹配器
- sql - CAST/CONVERT 问题 算术溢出错误
- c# - 如何使用 Xamarin.Essentials 将 PDF 作为 EmailAttachment 附加?
- azure - 如何使用 azure 流分析查询语言更新 cosmos db 中的值?
- javascript - JavaScript 地图对象类型
- c++ - 如何从静态 c 和 c++ 库创建共享 c 库?
- java - 检测并从 JSON 对象中删除冗余节点,节省核心结构
- c++ - 在链接的程序集文件中,我想从 C++ 调用代码中访问一个变量。可以在不触发访问冲突的情况下执行此操作吗?