python - Python Asyncio - RuntimeError: Cannot close a running event loop
问题描述
I'm trying to resolve this error: RuntimeError: Cannot close a running event loop
in my asyncio process. I believe it's happening because there's a failure while tasks are still pending, and then I try to close the event loop. I'm thinking I need to await the remaining responses prior to closing the event loop, but I'm not sure how to accomplish that correctly in my specific situation.
def start_job(self):
if self.auth_expire_timestamp < get_timestamp():
api_obj = api_handler.Api('Api Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
try:
self.queue_manager(self.do_stuff(json_data))
except aiohttp.ServerDisconnectedError as e:
logging.info("Reconnecting...")
api_obj = api_handler.Api('API Name', self.dbObj)
self.api_auth_resp = api_obj.get_auth_response()
self.api_attr = api_obj.get_attributes()
self.run_eligibility()
async def do_stuff(self, data):
tasks = []
async with aiohttp.ClientSession() as session:
for row in data:
task = asyncio.ensure_future(self.async_post('url', session, row))
tasks.append(task)
result = await asyncio.gather(*tasks)
self.load_results(result)
def queue_manager(self, method):
self.loop = asyncio.get_event_loop()
future = asyncio.ensure_future(method)
self.loop.run_until_complete(future)
async def async_post(self, resource, session, data):
async with session.post(self.api_attr.api_endpoint + resource, headers=self.headers, data=data) as response:
resp = []
try:
headers = response.headers['foo']
content = await response.read()
resp.append(headers)
resp.append(content)
except KeyError as e:
logging.error('KeyError at async_post response')
logging.error(e)
return resp
def shutdown(self):
//need to do something here to await the remaining tasks and then I need to re-start a new event loop, which i think i can do, just don't know how to appropriately stop the current one.
self.loop.close()
return True
How can I handle the error and properly close the event loop so I can start a new one and essentially re-boot the whole program and continue on.
EDIT:
This is what I'm trying now, based on this SO answer. Unfortunately, this error only happens rarely, so unless I can force it, i will have to wait and see if it works. In my queue_manager
method I changed it to this:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.loop.run_until_complete(future)
future.exception()
UPDATE:
I got rid of the shutdown()
method and added this to my queue_manager()
method instead and it seems to be working without issue:
try:
self.loop.run_until_complete(future)
except Exception as e:
future.cancel()
self.check_in_records()
self.reconnect()
self.start_job()
future.exception()
解决方案
要回答最初所说的问题,不需要close()
运行循环,您可以为整个程序重用相同的循环。
鉴于更新中的代码,您queue_manager
可能如下所示:
try:
self.loop.run_until_complete(future)
except Exception as e:
self.check_in_records()
self.reconnect()
self.start_job()
取消future
是没有必要的,据我所知没有任何效果。这与专门对 做出反应的引用答案不同KeyboardInterrupt
,特别是因为它是由 asyncio 本身提出的。KeyboardInterrupt
可以在run_until_complete
没有实际完成的情况下传播。在 asyncio 中正确处理Ctrl-C是非常困难的,甚至是不可能的(详见此处),但幸运的是,问题根本不是关于Ctrl-C,而是关于协程引发的异常。(请注意,KeyboardInterrupt
它不继承自Exception
,因此在Ctrl-Cexcept 主体的情况下甚至不会执行。)
我取消了未来,因为在这种情况下还有剩余的任务待处理,我想从本质上删除这些任务并开始一个新的事件循环。
这是一件正确的事情,但是(更新的)问题中的代码只是取消了一个未来,即已经传递给run_until_complete
. 回想一下,future 是稍后提供的结果值的占位符。一旦提供了值,就可以通过调用来检索它future.result()
。如果未来的“价值”是异常,future.result()
将引发该异常。run_until_complete
有一个约定,只要给定的未来产生一个值,它就会运行事件循环,然后它返回那个值。如果“值”实际上是一个异常要提高,那么run_until_complete
将重新提高它。例如:
loop = asyncio.get_event_loop()
fut = loop.create_future()
loop.call_soon(fut.set_exception, ZeroDivisionError)
# raises ZeroDivisionError, as that is the future's result,
# manually set
loop.run_until_complete(fut)
当所讨论的未来实际上是 aTask
时,一个将协程包装到 a 中的异步特定对象,Future
这种未来的结果是协程返回的对象。如果协程引发异常,则检索结果将重新引发异常,因此run_until_complete
:
async def fail():
1/0
loop = asyncio.get_event_loop()
fut = loop.create_task(fail())
# raises ZeroDivisionError, as that is the future's result,
# because the coroutine raises it
loop.run_until_complete(fut)
在处理任务时,run_until_complete
finishing 意味着协程也完成了,要么返回值,要么引发异常,由run_until_complete
返回或引发来确定。
另一方面,取消任务的工作方式是安排要恢复的任务和await
暂停它的表达式 raise CancelledError
。除非任务专门捕获并抑制此异常(行为良好的 asyncio 代码不应该这样做),否则任务将停止执行并CancelledError
成为其结果。cancel()
但是,如果协程在被调用时已经完成,则cancel()
无法执行任何操作,因为没有待await
注入的待处理CancelledError
。
推荐阅读
- c++ - 在 std::Vector 中重新迭代时,Shared_ptr 为空
- xamarin.forms - 如何更改 Xamarin 表单中占位符的文本颜色
- php - 如果匹配,则提取带括号的字符串(正则表达式)
- angular - 无法在 Ubuntu 18.04 上安装 Angular cli
- maven - Jenkins阶段maven安装被跳过
- c# - 如何在 C# 中检查 yyyyMMddHHmmss 格式的字符串?
- c# - 使用 C# 将 SharePoint 库 XML 文档转换为 PDF 文档
- webpack - 添加 Bootstrap 后,Webpack Encore 平均编译时间从 160ms 下降到 270ms
- javascript - 状态正在更新,但图像未呈现
- r - 在一个语句中计算多个移动计算