首页 > 解决方案 > yield from 未与 future 一起使用

问题描述

我正在使用requests-threads运行以下命令:

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def _perform_requests():
        for request in prepared_requests:
            results.append(session.request(**request))
        for i, result in enumerate(results):
            results[i] = await asyncio.ensure_future(results[i])

    session.run(_perform_requests)
    return results

但是,当我运行它时,会发生一些奇怪的事情,首先我会收到大量消息,例如:

(WARNING) Connection pool is full, discarding connection:

其次,我收到此错误:

results[i] = await asyncio.ensure_future(results[i])
builtins.AssertionError: yield from wasn't used with future

我在用ensure_future(),怎么回事?

标签: pythonpython-asyncio

解决方案


session.request()方法返回一个扭曲的Deferred对象(requests-threads代码调用twisted.internet.threads.deferToThread())。您通常不想将其视为 asyncio 任务,而不是在 Twisted reactor 下运行。

相反,您可以使用twisted.internet.deferred.gatherResults()并发执行请求并收集响应。

接下来,session.run()调用twisted.internet.task.react(),它将始终退出 Python

[...] 此功能还将:

[...]

  • 完成后退出应用程序,退出代码 0 表示成功,1 表示失败。

(粗体强调我的)。

这意味着即使您的代码有效,return results也永远不会到达该行。

如果您将调用移出成为session.run()应用程序的顶级入口点,那么一切正常:

from requests_threads import AsyncSession
from twisted.internet import defer

session = AsyncSession(n=100)

async def perform_requests():
    prepared_requests = [...]
    requests = [session.request(**request) for request in prepared_requests]
    responses = await defer.gatherResults(requests)
    print(responses)        

session.run(perform_requests)

但在打印responses列表后立即退出。

否则,您将不得不直接管理扭曲的反应器(使用和一旦响应完成reactor.run()调用的回调);reactor.stop()例如:

from requests_threads import AsyncSession
from twisted.internet import defer, error, reactor

def perform_requests():
    prepared_requests = [...]
    session = AsyncSession(n=100)
    results = []

    async def gather_responses():
        requests = [session.request(**request) for request in prepared_requests]
        results[:] = await defer.gatherResults(requests)
        try:
            reactor.stop()
        except error.ReactorNotRunning:
            pass

    deferred = defer.ensureDeferred(gather_responses())
    reactor.run()
    return results

print(perform_requests())

如果您需要在扭曲的反应器上运行多个任务,您可以使用单个顶级函数并依靠回调来让您知道响应何时完成。

就个人而言,我认为您最好使用该aiohttp.client模块asyncio在 Python事件循环下运行异步请求:

import asyncio
import aiohttp

async def perform_requests():
    prepared_requests = [...]
    conn = aiohttp.TCPConnector(limit=100)

    with aiohttp.ClientSession(connector=conn) as session:
        requests = [session.request(**request) for request in prepared_requests]
        responses = await asyncio.gather(*requests)

        print(responses)

if __name__ == '__main__':
    asyncio.run(perform_requests())

请注意,asyncio.run()需要 Python 3.7 或更高版本;您的错误消息表明您仍在使用 3.5 或 3.6。一种解决方法是使用loop.run_until_complete()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(perform_requests())

推荐阅读