首页 > 解决方案 > 为什么工人停止在游泳池内工作?

问题描述

为了并行处理某些数据,我想创建 2 个不同的工作人员。它们各自使用不同的数据源。因此,我确实使用一个工人的工人池单独称呼他们。(见代码)

我想为每个人单独使用一个队列来获取输入数据。第一个贯穿工作正常。但是如果我为工人添加更多工作,工人不会处理它们。您还可以看到工作列表越来越大。

我在这里做错了什么?所需的输出将是我放入队列中的每个作业的打印。

您可以检查下面用 Python 3.7 编写的代码,它应该会重现该错误。

额外问题:如果我删除主要部分中的“time.sleep(0.1)” ,并在 PowerShell 中运行脚本,脚本会在打印第二个工作人员的结果之前完成。如何在此处使用 .join 以等待进程运行?

# Creates workers
def worker1(q):
    while True:
        item = q.get(True)
        print('I am Worker 1: I started working, len of the queue is : ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


def worker2(q):
    while True:
        item = q.get(True)
        print('I am Worker 2: I started working, len of the queue is :  ' + str(q.qsize()))
        print(item)

        # Simulating a long calculation
        time.sleep(1)


if __name__ == '__main__':

    # Configs
    n_workers = 2

    # Define Queues
    q1 = multiprocessing.Manager().Queue()
    q2 = multiprocessing.Manager().Queue()

    the_queue = [q1, q2]
    the_pool = []
    workers = [worker1, worker2]

    # Creating one worker for each Queue
    for i in range(n_workers):
        the_pool.append(multiprocessing.Pool(1, workers[i],(the_queue[i],)))

    # First Run
    for i in range(n_workers):
        the_queue[i].put("hello " + str(i))
        print('I just put in a value, len of the queue is :  ' + str(the_queue[i].qsize()))

    # Give some time before adding new tasks
    time.sleep(0.1)

    # Next Runs
    for ii in range(10):
        for i in range(n_workers):
            the_queue[i].put("hello " + str(i))
            print('I just put in a value, len of the queue is :   ' + str(the_queue[i].qsize()))

标签: pythonmultiprocessingpool

解决方案


推荐阅读