首页 > 解决方案 > 异步 IO 协程如何执行?

问题描述

我正在从这里的示例中查看这段代码

我想知道消费者()协程在什么时候被调用?

import asyncio
import itertools as it
import os
import random
import time

async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()

async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = random.randint(0, 10)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)

async def produce(name: int, q: asyncio.Queue) -> None:
    n = random.randint(0, 10)
    for _ in it.repeat(None, n):  # Synchronous loop for each single producer
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = time.perf_counter()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")

async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = time.perf_counter()
        print(f"Consumer {name} got element <{i}>"
              f" in {now-t:0.5f} seconds.")
        q.task_done()

async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too
    for c in consumers:
        c.cancel()

if __name__ == "__main__":
    import argparse
    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

我只看到这条线触发了生产者和消费者协程的执行。

await asyncio.gather(*producers)

我不明白直到上面提到的等待行在这些行中定义和创建任务时在后台没有执行(因为没有显示生产者和消费者内部的打印语句)。:

producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]


consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]

标签: pythonpython-3.xpython-asyncio

解决方案


虽然create_task()不会立即开始执行协程,但它会在第一个可能的机会(即暂停到事件循环的第一个)时安排在后台执行。await

gather()只是一个辅助函数,它等待给定的 awaitables 完成。它不会阻止以前调度的协程(例如以 开头的协程create_task,但也有start_server等)执行。

我想知道消费者()协程在什么时候被调用?

由于consumers是一个协程,当它被调用一次时,它可以暂停和恢复很多次,每次都await作为一个暂停/恢复点。当你调用create_task()它时,它被放置在一个可运行的协程队列中。在事件循环的每次迭代中,asyncio 都会通过可运行的协程并执行每个协程的“步骤”,其中该步骤执行它直到第一个await选择挂起。在您的代码中,当您的main协程暂停以等待gather()完成时,就会发生该步骤。


推荐阅读