首页 > 解决方案 > 为什么 asyncio 在 create_task 上运行的流上立即抛出 CancelledError

问题描述

我正在使用 asyncio 和流进行测试,试图理解它,但我遇到了一个问题。

我要做的是打开与服务器的连接并为每个流、读取器和写入器创建一个协程,这样我就可以与服务器独立交互。我可以独立发送和接收消息。这是因为一旦连接打开,服务器就可以向客户端发送状态消息,无论它是否请求它。

我正在尝试在 Executor 上运行 input() ,这样它就不会阻塞程序。

问题是当我运行代码时,循环立即完成协程,我没有机会发送或接收某些东西。输出在代码之后。

import argparse
import asyncio
from asyncio import StreamReader, StreamWriter

async def msg_reader(reader: StreamReader):
    try:
        while data := await reader.readline():
            print(f"\r\n{data.decode()}\r\n> ")
        
        print(f"READER - Connection Ended\r\n")
    
    except asyncio.CancelledError as cerr:
        print(f"\r\nREADER ERR - {cerr}")
        print(f'\r\nREADER - Remote connection cancelled.')
    except asyncio.IncompleteReadError:
        print(f'\r\nREADER - Remote disconnected')
    finally:
        print(f'\r\nREADER - Remote closed')

async def async_input(prompt: str = "") -> str:
    ret_str = await asyncio.get_event_loop().run_in_executor(None, input, prompt)
    return ret_str

async def msg_writer(writer: StreamWriter):
    try:
        while True:
            msg = await async_input("> ")
            print(f"got: {msg}")
            
            writer.writelines([msg.encode()])
            await writer.drain()
    
    except asyncio.CancelledError as cerr:
        print(f"\r\nWRITER ERR - {cerr}")
        print(f'WRITER - Remote connection cancelled.')
    finally:
        print(f'WRITER - Remote closed')
        writer.close()

async def main():
    parser = argparse.ArgumentParser(description = "This is the client for the multi threaded socket server!")
    parser.add_argument('--host', metavar = 'host', type = str, nargs = '?', default = "127.0.0.1")
    parser.add_argument('--port', metavar = 'port', type = int, nargs = '?', default = 25000)
    args = parser.parse_args()

    print(f"Connecting to server: {args.host} on port: {args.port}")

    reader, writer = await asyncio.open_connection(host=args.host, port=args.port)
    print(f'I am {writer.get_extra_info("sockname")}')

    asyncio.create_task(msg_reader(reader))
    asyncio.create_task(msg_writer(writer))

try:
    asyncio.run(main())
except KeyboardInterrupt:
    print('\r\nBye!')

输出是:

python3 btv_async_client.py
Connecting to server: 127.0.0.1 on port: 25000
I am ('127.0.0.1', 51990)
> 
READER ERR - 

READER - Remote connection cancelled.

READER - Remote closed

WRITER ERR - 
WRITER - Remote connection cancelled.
WRITER - Remote closed

感谢任何帮助...

标签: pythonpython-asyncio

解决方案


问题是您的main函数创建了两个后台任务,然后只返回. main返回时,通过asyncio.run(除其他外)取消所有任务来清理事件循环。如果您想main一直运行msg_reader并且msg_writer处于活动状态,则需要告诉 asyncio,例如:

async def main():
    ... setup ...

    await asyncio.gather(msg_reader(reader), msg_writer(writer))

推荐阅读