首页 > 解决方案 > 多进程生成器方法仅对每个进程工作一次

问题描述

我有一个看起来像这样的生成器:

class GeneratorClass():
    def __init__(self, objClient):
        self.clienteGen = objClient

    def generatorDataClient(self):
        amount = 0
        while True:
            amount += random.randint(0, 2000)
            yield amount
            sleep = random.choice([1,2,3])
            print("sleep " + str(sleep))
            time.sleep(sleep)

然后我遍历它,它有效:current_mean()每次生成新数据时它都会执行该方法。

def iterate_clients(pos):
    genobject4 = GeneratorClass(client_list[pos])
    generator4 = genobject4.generatorDataClient()    
    current_client = genobject1.default_client
    account1 = current_client.account
    cnt = 0
    acc_mean = 0

    for item in generator4:
        #We call a function previously defined
        acc_mean, cnt = account1.current_mean(acc_mean, item, cnt)
        print("media : " + str(acc_mean), str(cnt))

# iterate_clients(2)

它工作,你给它一个有效的客户端,它开始执行生成操作,这是一个移动平均线,并且由于它是用 a 定义的,While: True它不会停止。

现在我想并行化这个并且我设法让它工作,但只有一次:

names = ["James", "Anna"]
client_list = [Cliente(name) for name in names]
array_length = len(client_list)


import multiprocessing
if __name__ == '__main__':
  for i in range(array_length):
    p = multiprocessing.Process(target=iterate_clients, args=(i,))
    p.start()

但相反,每个进程开始,只重复一次,然后停止。结果如下:

calling object with ID: 140199258213624
calling the generator
moving average : 4622.0 1
calling object with ID: 140199258211160
sleep 2
calling the generator
moving average : 8013.0 1
sleep 1

我确信代码可以改进,但我可能错过了一些关于如何并行化这个问题的信息吗?

编辑:

多亏了这个答案,我尝试将循环从更改for i in range(array_length):while True:

我得到了一些新的东西:

calling object 140199258211160
calling the generator
calling object 140199258211160
moving average : 7993.0 1
calling the generator
duerme 3
calling object 140199258211160
calling the generator
calling object 140199258211160
moving average : 8000.0 1
moving average : 7869.0 1
duerme 3
calling the generator

它永远不会停止。所以从这里我知道我犯了一个巨大的错误,因为只创建了一个进程,并且它似乎具有竞争条件,因为移动平均线来回移动并且它只在正常过程中上升。

标签: pythonpython-3.xconcurrencymultiprocessinggenerator

解决方案


这里的问题可能是子进程在主进程完成后终止,因此它们永远没有机会完全运行。

使用.join()将使主进程等待子进程。

...

import multiprocessing

procs = []

if __name__ == '__main__':
  for i in range(array_length):
    p = multiprocessing.Process(target=iterate_clients, args=(i,))
    p.start() 
    procs.append(p)  # Hold a reference to each child process

for proc in procs:
  proc.join()  # Wait for each child process

推荐阅读