python-3.x - Python异步队列没有完成
问题描述
我有一个生产者和三个消费者。每个消费者在继续之前等待获取全局锁。程序运行并没有完成并退出while循环。你能告诉我哪里出错了吗?
import asyncio
import random
async def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print('producing {}/{}'.format(x, n))
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
item = str(x)
# put the item in the queue
await queue.put(item)
# indicate the producer is done
await queue.put(None)
async def consume(queue, lock):
while True:
item = await queue.get()
if item is None:
# the producer emits None to indicate that it is done
break
# wait for an item from the producer
async with lock:
# process the item
print('consuming item {}...'.format(item))
# simulate i/o operation using sleep
await asyncio.sleep(0.3)
loop = asyncio.get_event_loop()
lock = asyncio.Lock()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumers = []
for _ in range(3):
consumers.append(consume(queue, lock))
all_coroutines = []
all_coroutines.append(producer_coro)
all_coroutines.extend(consumers)
loop.run_until_complete(asyncio.wait(all_coroutines))
loop.close()
解决方案
问题出在消费者身上:
if item is None:
# the producer emits None to indicate that it is done
break
哨兵None
仅由单个消费者拾取,其余的则等待下一个值通过队列到达。一个简单的解决方法是将标记值返回到队列中:
if item is None:
# The producer emits None to indicate that it is done.
# Propagate it to other consumers and quit.
await queue.put(None)
break
或者,produce
可以将与消费者一样多None
的哨兵加入队列——但这需要生产者知道有多少消费者,这并不总是可取的。
推荐阅读
- javascript - 用于识别 URL 并使它们可点击的 JavaScript 函数
- docker-compose - cookie 设置为安全,但您的重定向 url 在 keycloak-gatekeeper 中是非 tls 错误
- html - 通过 API 将数据导入 H2
- android - 如何使 NavigationDrawer 中的单个项目具有不同颜色的文本?
- c# - 基于 C# 中的条件语句创建预处理器指令
- php - 如何在 PHP 中从键数组创建多级数组
- python - 基于python中数据框中的列值循环文本数据
- javascript - Array.some() 不推入空数组
- javascript - Vue 3 和 TipTap Editor 在全局范围内传递
- spring - hibernate.cfg.xml 无法解析为 URL,因为它不存在。NetBeans Spring MVC Hibernate Web 应用程序错误