首页 > 解决方案 > 在所有任务启动之前恢复异步任务

问题描述

在此处的示例代码中,首先启动所有异步任务。之后,如果 IO 操作完成,任务将恢复。

输出如下所示,您可以在前 6 条启动消息之后看到 6 条结果消息。

-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.

但是我所期望的以及在我的情况下会加快速度的是这样的

-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
1938204 size for https://www.b-i-t-online.de/bitrss.xml
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
38697 size for https://jamanetwork.com/rss/site_3/67.xml
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.

在我的真实世界代码中,我有数百个这样的下载任务。通常,一些下载在所有下载开始之前就完成了。

有没有办法处理这个asyncio

这是一个最小的工作示例:

#!/usr/bin/env python3
import random
import urllib.request
import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()
urls = ['https://www.b-i-t-online.de/bitrss.xml',
        'https://jamanetwork.com/rss/site_3/67.xml',
        'http://twitrss.me/twitter_user_to_rss/?user=cochranecollab']

async def parse_one_url(u):
    print('-- Starting {}...'.format(u))
    r = await loop.run_in_executor(executor,
                                   urllib.request.urlopen, u)
    r = '{} size for {}'.format(len(r.read()), u)
    print(r)

async def do_async_parsing():
    tasks = [
        parse_one_url(u)
        for u in urls
            ]

    completed, pending = await asyncio.wait(tasks)
    results = [task.result() for task in completed]

    print('FINISHED with {} results from {} tasks.'
          .format(len(results), len(tasks)))

if __name__ == '__main__':
    # blow up the urls
    urls = urls * 2
    random.shuffle(urls)
    try:
        #loop.set_debug(True)
        loop.run_until_complete(do_async_parsing())
    finally:
        loop.close()

附带问题asyncio在我的情况下没有用吗?只使用多线程不是更容易吗?

标签: python-3.xpython-asyncio

解决方案


在我的真实世界代码中,我有数百个这样的下载任务。通常,一些下载在所有下载开始之前就完成了。

好吧,您确实预先创建了所有下载,并指示 asyncio 使用asyncio.wait. 刚开始执行协程几乎是免费的,因此没有理由对这部分进行任何限制。但是,实际提交的任务限制为池中的工作人员数量,默认为 CPU 数量的 5 倍,但可配置。如果 URL 的数量超过工作人员的数量,您应该会获得所需的行为。(但要真正观察它,您需要将日志打印移动到执行程序管理的函​​数中。)ThreadPoolExecutor

请注意,同步调用也r.read()必须驻留在执行器运行的函数内,否则会阻塞整个事件循环。代码的更正部分如下所示:

def urlopen(u):
    print('-- Starting {}...'.format(u))
    r = urllib.request.urlopen(u)  # blocking call
    content = r.read()             # another blocking call
    print('{} size for {}'.format(len(content), u))

async def parse_one_url(u):
    await loop.run_in_executor(executor, urlopen, u)

然而,以上并不是 asyncio 的惯用用法。通常的想法是您根本不使用线程,而是调用本机异步代码,例如使用aiohttp。然后,您将获得 asyncio 的好处,例如工作取消和大量任务的可伸缩性。在该设置中,您可以通过将检索简单地包装在asyncio.Semaphore.

如果您的整个实际逻辑由同步调用组成,那么您根本不需要 asyncio;您可以直接将期货提交给执行器并使用concurrent.futures同步功能,例如wait()as_completed等待它们完成。


推荐阅读