首页 > 解决方案 > Python异步队列没有完成

问题描述

我有一个生产者和三个消费者。每个消费者在继续之前等待获取全局锁。程序运行并没有完成并退出while循环。你能告诉我哪里出错了吗?

import asyncio
import random

async def produce(queue, n):
    for x in range(1, n + 1):
        # produce an item
        print('producing {}/{}'.format(x, n))
        # simulate i/o operation using sleep
        await asyncio.sleep(random.random())
        item = str(x)
        # put the item in the queue
        await queue.put(item)

    # indicate the producer is done
    await queue.put(None)


async def consume(queue, lock):
    while True:
        item = await queue.get()
        if item is None:
            # the producer emits None to indicate that it is done
            break
        # wait for an item from the producer
        async with lock:
            # process the item
            print('consuming item {}...'.format(item))
            # simulate i/o operation using sleep
            await asyncio.sleep(0.3)


loop = asyncio.get_event_loop()
lock = asyncio.Lock()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumers = []
for _ in range(3):
    consumers.append(consume(queue, lock))
all_coroutines = []
all_coroutines.append(producer_coro)
all_coroutines.extend(consumers)
loop.run_until_complete(asyncio.wait(all_coroutines))
loop.close()

标签: python-3.xpython-asyncio

解决方案


问题出在消费者身上:

        if item is None:
            # the producer emits None to indicate that it is done
            break

哨兵None仅由单个消费者拾取,其余的则等待下一个值通过队列到达。一个简单的解决方法是将标记值返回到队列中:

        if item is None:
            # The producer emits None to indicate that it is done.
            # Propagate it to other consumers and quit.
            await queue.put(None)
            break

或者,produce可以将与消费者一样多None的哨兵加入队列——但这需要生产者知道有多少消费者,这并不总是可取的。


推荐阅读