首页 > 解决方案 > 多处理程序(生产者-消费者)退出而不打印任何东西 Python 3

问题描述

我正在尝试创建共享队列,从中执行任务并推送执行。但是,程序不打印任何内容并以代码 0 退出。

我正在使用 python 3.6 并尝试了我在互联网上找到的所有内容,但它不起作用。也许我错过了什么或做错了什么。请指出正确的方向。

import multiprocessing as mp
import time


def produce(i, rate, taskQue):
    print("+++ Producer:%s +++" % i)
    time.sleep(0.01)
    for r in range(rate):
        taskQue.put(0)
    time.sleep(1)


def consume(i, rate, taskQue):
    print("--- Consumer:%s ---" % i)
    for r in range(rate):
        while taskQue.empty():
            print("| Consumer:%s ..." % i)
            time.sleep(0.5)
        time.sleep(0.01)
        taskQue.get()
    time.sleep(1)


if __name__ == '__main__':
    manager = mp.Manager()
    taskQue = manager.Queue()

    producerDetails = [[1, 5, taskQue], [2, 7, taskQue], [3, 2, taskQue], [4, 3, taskQue]]
    producerPool = mp.Pool(processes=5)
    produced = producerPool.apply_async(produce, producerDetails)

    consumerDetails = [[1, 5, taskQue], [2, 5, taskQue], [3, 3, taskQue], [4, 5, taskQue]]
    consumerPool = mp.Pool(processes=5)
    consumed = consumerPool.apply_async(consume, consumerDetails)

    producerPool.close()
    producerPool.join()
    consumerPool.close()
    consumerPool.join()

标签: pythonparallel-processingprocessmultiprocessing

解决方案


我已经弄清楚我做错了什么。问题是 pool worker 不能接受在produce()和中指定的参数consume()。每个都需要三个参数。但是,producerDetails列表consumerDetails直接传递给Pool.apply_async()映射列表作为子列表中的一个而不是三个单独的参数。

为此,有一些Pool.starmap()函数Pool.starmap_async()可以正确接受当前列表和映射参数。

如果有人不明白我上面的意思,这是 Python 3.6 中的工作代码

import multiprocessing as mp
import time


def produce(i, rate, taskQue):
    for r in range(rate):
        print("+++ Producer:%s +++" % i)
        time.sleep(i * 0.01)
        taskQue.put(0)
    time.sleep(1)


def consume(i, rate, taskQue):
    for r in range(rate):
        while taskQue.empty():
            print("| Consumer:%s ..." % i)
            time.sleep(0.5)
        print("--- Consumer:%s ---" % i)
        time.sleep(i*0.01)
        taskQue.get()
    time.sleep(1)

if __name__ == '__main__':
    manager = mp.Manager()
    taskQue = manager.Queue()

    producerDetails = [[1, 5, taskQue], [2, 7, taskQue], [3, 2, taskQue], [4, 3, taskQue]]*50     
    producerPool = mp.Pool(processes=20)

    consumerDetails = [[1, 5, taskQue], [2, 5, taskQue], [3, 3, taskQue], [4, 5, taskQue]]*50
    consumerPool = mp.Pool(processes=20)

    produced = producerPool.starmap_async(produce, producerDetails)
    consumed = consumerPool.starmap_async(consume, consumerDetails)

    producerPool.close()
    producerPool.join()
    consumerPool.close()
    consumerPool.join()

推荐阅读