python - yield from 未与 future 一起使用
问题描述
我正在使用requests-threads运行以下命令:
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def _perform_requests():
for request in prepared_requests:
results.append(session.request(**request))
for i, result in enumerate(results):
results[i] = await asyncio.ensure_future(results[i])
session.run(_perform_requests)
return results
但是,当我运行它时,会发生一些奇怪的事情,首先我会收到大量消息,例如:
(WARNING) Connection pool is full, discarding connection:
其次,我收到此错误:
results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future
我在用ensure_future()
,怎么回事?
解决方案
该session.request()
方法返回一个扭曲的Deferred
对象(requests-threads
代码调用twisted.internet.threads.deferToThread()
)。您通常不想将其视为 asyncio 任务,而不是在 Twisted reactor 下运行。
相反,您可以使用twisted.internet.deferred.gatherResults()
并发执行请求并收集响应。
接下来,session.run()
调用twisted.internet.task.react()
,它将始终退出 Python:
[...] 此功能还将:
[...]
- 完成后退出应用程序,退出代码 0 表示成功,1 表示失败。
(粗体强调我的)。
这意味着即使您的代码有效,return results
也永远不会到达该行。
如果您将调用移出成为session.run()
应用程序的顶级入口点,那么一切正常:
from requests_threads import AsyncSession
from twisted.internet import defer
session = AsyncSession(n=100)
async def perform_requests():
prepared_requests = [...]
requests = [session.request(**request) for request in prepared_requests]
responses = await defer.gatherResults(requests)
print(responses)
session.run(perform_requests)
但在打印responses
列表后立即退出。
否则,您将不得不直接管理扭曲的反应器(使用和一旦响应完成reactor.run()
调用的回调);reactor.stop()
例如:
from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor
def perform_requests():
prepared_requests = [...]
session = AsyncSession(n=100)
results = []
async def gather_responses():
requests = [session.request(**request) for request in prepared_requests]
results[:] = await defer.gatherResults(requests)
try:
reactor.stop()
except error.ReactorNotRunning:
pass
deferred = defer.ensureDeferred(gather_responses())
reactor.run()
return results
print(perform_requests())
如果您需要在扭曲的反应器上运行多个任务,您可以使用单个顶级函数并依靠回调来让您知道响应何时完成。
就个人而言,我认为您最好使用该aiohttp.client
模块asyncio
在 Python事件循环下运行异步请求:
import asyncio
import aiohttp
async def perform_requests():
prepared_requests = [...]
conn = aiohttp.TCPConnector(limit=100)
with aiohttp.ClientSession(connector=conn) as session:
requests = [session.request(**request) for request in prepared_requests]
responses = await asyncio.gather(*requests)
print(responses)
if __name__ == '__main__':
asyncio.run(perform_requests())
请注意,asyncio.run()
需要 Python 3.7 或更高版本;您的错误消息表明您仍在使用 3.5 或 3.6。一种解决方法是使用loop.run_until_complete()
:
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(perform_requests())
推荐阅读
- firebase - 如何在 Firestore 中将多个数据设置为数组
- ios - SWIFTUI LazyVGrid 自动执行所有导航链接
- r - 如何在 R 中为不相等的组大小创建百分比堆栈图?
- nginx - Nginx 临时重定向新 url 并替换
- openstack - 我可以为 Openstack 中的租户网络使用不同的 NIC 吗?
- python - 从站点抓取配置文件时出错
- javascript - 无法使用 Angular 将表单数据发送到节点服务器
- node.js - opendir 返回 fs.Dir 可通过 promise 进行迭代,但不可通过 Sync 进行迭代。为什么?
- c - 英特尔上的 gcc / clang / msvc:为什么 ucomiss 为 QNAN == QNAN(预期),但为 QNAN <= QNAN(意外)?
- css - 使用 bootstrap4 排列复选框