python-3.x - 在所有任务启动之前恢复异步任务
问题描述
在此处的示例代码中,首先启动所有异步任务。之后,如果 IO 操作完成,任务将恢复。
输出如下所示,您可以在前 6 条启动消息之后看到 6 条结果消息。
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
但是我所期望的以及在我的情况下会加快速度的是这样的
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
-- Starting https://www.b-i-t-online.de/bitrss.xml...
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
1938204 size for https://www.b-i-t-online.de/bitrss.xml
-- Starting http://twitrss.me/twitter_user_to_rss/?user=cochranecollab...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
-- Starting https://jamanetwork.com/rss/site_3/67.xml...
38697 size for https://jamanetwork.com/rss/site_3/67.xml
-- Starting https://www.b-i-t-online.de/bitrss.xml...
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
28337 size for http://twitrss.me/twitter_user_to_rss/?user=cochranecollab
1938204 size for https://www.b-i-t-online.de/bitrss.xml
38697 size for https://jamanetwork.com/rss/site_3/67.xml
FINISHED with 6 results from 6 tasks.
在我的真实世界代码中,我有数百个这样的下载任务。通常,一些下载在所有下载开始之前就完成了。
有没有办法处理这个asyncio
?
这是一个最小的工作示例:
#!/usr/bin/env python3
import random
import urllib.request
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor()
loop = asyncio.get_event_loop()
urls = ['https://www.b-i-t-online.de/bitrss.xml',
'https://jamanetwork.com/rss/site_3/67.xml',
'http://twitrss.me/twitter_user_to_rss/?user=cochranecollab']
async def parse_one_url(u):
print('-- Starting {}...'.format(u))
r = await loop.run_in_executor(executor,
urllib.request.urlopen, u)
r = '{} size for {}'.format(len(r.read()), u)
print(r)
async def do_async_parsing():
tasks = [
parse_one_url(u)
for u in urls
]
completed, pending = await asyncio.wait(tasks)
results = [task.result() for task in completed]
print('FINISHED with {} results from {} tasks.'
.format(len(results), len(tasks)))
if __name__ == '__main__':
# blow up the urls
urls = urls * 2
random.shuffle(urls)
try:
#loop.set_debug(True)
loop.run_until_complete(do_async_parsing())
finally:
loop.close()
附带问题:asyncio
在我的情况下没有用吗?只使用多线程不是更容易吗?
解决方案
在我的真实世界代码中,我有数百个这样的下载任务。通常,一些下载在所有下载开始之前就完成了。
好吧,您确实预先创建了所有下载,并指示 asyncio 使用asyncio.wait
. 刚开始执行协程几乎是免费的,因此没有理由对这部分进行任何限制。但是,实际提交的任务被限制为池中的工作人员数量,默认为 CPU 数量的 5 倍,但可配置。如果 URL 的数量超过工作人员的数量,您应该会获得所需的行为。(但要真正观察它,您需要将日志打印移动到执行程序管理的函数中。)ThreadPoolExecutor
请注意,同步调用也r.read()
必须驻留在执行器运行的函数内,否则会阻塞整个事件循环。代码的更正部分如下所示:
def urlopen(u):
print('-- Starting {}...'.format(u))
r = urllib.request.urlopen(u) # blocking call
content = r.read() # another blocking call
print('{} size for {}'.format(len(content), u))
async def parse_one_url(u):
await loop.run_in_executor(executor, urlopen, u)
然而,以上并不是 asyncio 的惯用用法。通常的想法是您根本不使用线程,而是调用本机异步代码,例如使用aiohttp。然后,您将获得 asyncio 的好处,例如工作取消和大量任务的可伸缩性。在该设置中,您可以通过将检索简单地包装在asyncio.Semaphore
.
如果您的整个实际逻辑由同步调用组成,那么您根本不需要 asyncio;您可以直接将期货提交给执行器并使用concurrent.futures
同步功能,例如wait()
和as_completed
等待它们完成。
推荐阅读
- java - 我应该如何描述这种创建随机对象的方法?
- asp.net-core - 令牌服务器上自定义端点的 Identity Server 4 客户端凭据
- c# - 服务参考连接问题
- sql - 如果存在重复行,则增加 SQL 数据库中的列值
- python - 尽管 df 包含 0 或 NaN,但仍显示堆积面积图
- javascript - 具有多个值的 json 对象的正确结构
- shiny - 仅获取已更改的 InputID
- python - How to get a robust nonlinear regression fit using scipy.optimize.least_squares?
- vba - VBA 中使用的命名范围
- json - 如何在 yaml 文件中使用 json 变量(Helm)