首页 > 解决方案 > asyncio.gather(*tasks) 仅等待所有任务的子集失败

问题描述

要解决的问题(简化)

假设我有 26 个任务要并行运行。为了尽量减少服务器上的负载,我决定一次运行 10 个:首先,并行运行 10 个任务,然后是接下来的 10 个,最后是剩下的 6 个。

我写了一个简单的脚本来实现这种行为:

import asyncio
from string import ascii_uppercase
from typing import List

TASK_NAMES = ascii_uppercase  # 26 fake tasks in total


class BatchWorker:
    """Run a list of tasks in batch."""

    BATCH_SIZE = 10

    def __init__(self, tasks: List[asyncio.Task]):
        self._tasks = list(tasks)

    @property
    def batch_of_tasks(self):
        """Yield all tasks by chunks of `BATCH_SIZE`"""
        start = 0
        while 'there are items remaining in the list':
            end = start + self.BATCH_SIZE
            chunk = self._tasks[start:end]
            if not chunk:
                break
            yield chunk
            start = end

    async def run(self):
        print(f'Running {self.BATCH_SIZE} tasks at a time')
        for batch in self.batch_of_tasks:
            print(f'\nWaiting for {len(batch)} tasks to complete...')
            await asyncio.gather(*batch)
            print('\nSleeping...\n---')
            await asyncio.sleep(1)


async def task(name: str):
    print(f"Task '{name}' is running...")
    await asyncio.sleep(3)  # Pretend to do something


async def main():
    tasks = [
      asyncio.create_task(task(name))
      for name in TASK_NAMES
    ]
    worker = BatchWorker(tasks)
    await worker.run()


if __name__ == '__main__':
    asyncio.run(main())

我的期望

我预计日志如下:

Task A is running
[...]
Task J is running
Sleeping
---
Task K is running
[...]
Task T is running
Sleeping
---
[...]

......你明白了。

我实际得到的

然而,在第一次迭代中,工作人员等待所有 26 个任务完成,尽管我要求只收集其中的 10 个。查看日志:

Running 10 tasks at a time

Waiting for 10 tasks to complete...
Task 'A' is running...
Task 'B' is running...
Task 'C' is running...
Task 'D' is running...
Task 'E' is running...
Task 'F' is running...
Task 'G' is running...
Task 'H' is running...
Task 'I' is running...
Task 'J' is running...
Task 'K' is running...
Task 'L' is running...
Task 'M' is running...
Task 'N' is running...
Task 'O' is running...
Task 'P' is running...
Task 'Q' is running...
Task 'R' is running...
Task 'S' is running...
Task 'T' is running...
Task 'U' is running...
Task 'V' is running...
Task 'W' is running...
Task 'X' is running...
Task 'Y' is running...
Task 'Z' is running...

Sleeping...
---

Waiting for 10 tasks to complete...

Sleeping...
---

Waiting for 6 tasks to complete...

Sleeping...
---

如您所见,总共有 3 个批次(如预期的那样),但只有第一个批次有所作为。剩下的2个没有任何关系。

我的问题

  1. 鉴于官方文档声明.gather()将仅同时运行作为参数提供的等待,为什么我的脚本运行我的所有任务而不是它们的块?

  2. 我还应该用什么来让它按我的意愿工作?

标签: pythonasynchronousasync-awaitpython-asyncio

解决方案


gather并没有真正“运行”等待对象,它只是在事件循环执行其操作时休眠,并在接收到的等待对象完成后唤醒。您的代码的作用是:

  1. 用于asyncio.create_task()在后台生成一堆等待对象。
  2. 用于asyncio.gather()分批等待,直到其中一些完成。

#2 中接收到 #1 中创建的任务子集这一事实gather()不会阻止 #1 中创建的其余任务愉快地运行。

要解决此问题,您必须将呼叫推迟create_task()到最后一个时间点。事实上,由于对其参数的调用(并gather()用协程对象调用最终调用),你根本不需要调用。如果您从 main中删除对的调用,并将协程对象传递给(以及随后传递给),那么这些任务将按照您的需要分批调度和等待:ensure_future()ensure_futurecreate_taskcreate_task()create_task()BatchWorkergather

async def main():
    tasks = [task(name) for name in TASK_NAMES]
    worker = BatchWorker(tasks)
    await worker.run()

推荐阅读