python-3.x - Python 3 - 多个 AsyncIO 连接
问题描述
我正在尝试学习如何在 Python 3.7 中使用 AsyncIO,但我仍然对其原理感到有些困惑。
我的目标是编写一个简单的聊天程序,但是我需要使用环形网络拓扑——一个节点只知道它的两个邻居。消息发送后,由节点传递,直到再次到达发送者。这意味着每个节点基本上同时是一个客户端和一个服务器。
我还需要能够检测到死节点,这样我的戒指就不会破裂。
我认为对于每个节点来说,为每个邻居建立一个单独的连接可能是一个很好的解决方案——successor
并且predecessor
.
class Node:
...
def run():
...
s = loop.create_connection(lambda: Client(...), addr1, port1)
p = loop.create_server(lambda: Server(...), addr2, port2)
successor = loop.run_until_complete(s)
predecessor = loop.run_until_complete(p)
loop.run_forever()
...
...
Server
并且Client
是实现asyncio.Protocol
.
我想这样做的原因是,如果有消息通过圆圈发送,它总是从 发送predecessor
到successor
. 在我connection_lost
的方法中,predecessor
我可以检测到它已断开连接并发送predecessor
一条消息(通过整个环)连接到我。
我希望能够将我收到的消息发送predecessor
到我的successor
. 我还希望能够向我的地址发送一条消息,successor
以防我predecessor
死了(此消息将从predecessor
's发送Server.connection_lost()
并一直传递到 my dead predecessor
's predecessor
)。
我的问题是:我可以将收到的数据从predecessor
to传递successor
吗?如果不是,那么使用 AsyncIO 和环形拓扑的这个程序的更好实现是什么?
解决方案
对于遇到同样问题的 AsyncIO 新手,我自己找到了解决方案。
首先,最好使用 AsyncIO 的高级方面—— streams
. 调用loop.create_connction
andloop.create_server
被认为是低级的(一开始我理解错了)。
create_connection
is的高级替代方案asyncio.open_connection
,它将为您提供一个元组asyncio.StreamReader
,asyncio.StreamWriter
您可以使用它来读取和写入打开的连接。当从StreamReader
equals to读取数据b''
或在ConnectionError
尝试写入StreamWriter
.
create_server
is的高级替代方案asyncio.start_server
,它需要提供一个回调函数,每次与服务器建立连接时都会调用该函数(打开连接,接收数据......)。回调有StreamReader
和StreamWriter
作为参数。连接丢失也可以通过接收b''
或ConnectionError
写入writer
.
多个连接可以由协程处理。服务器部分可以有一个协程(它接受来自环拓扑中的一个邻居的连接)和客户端部分的协程(它打开到环中另一个邻居的连接)。该类Node
可能如下所示:
import asyncio
class Node:
...
async def run(self):
...
self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
server_coro = asyncio.create_task(self.server_init())
client_coro = asyncio.create_task(self.client_method())
await client_coro
await server_coro
...
async def server_init(self):
server = await asyncio.start_server(self.server_callback, self.IP, self.port)
async with server:
await server.serve_forever()
async def client_method(self):
...
try:
data = await self.next_reader.read()
except ConnectionError:
...
...
请注意,我使用asyncio.create_task
的是协程 and (不在代码清单中) ,它们被认为是andasyncio.run(node.run())
的高级替代品。这两个都是在 Python 3.7 中添加的,据说是临时的,所以当你阅读本文时,可能已经被其他东西取代了。asyncio.ensure_future()
loop.run_forever()
asyncio.run()
我不是 AsyncIO 专家,所以可能有更好、更简洁的方法来做到这一点(如果你知道,请分享)。
推荐阅读
- django - CSRF 验证失败。请求中止 - 即使添加 {% csrf_token %}
- python - 添加特殊标记会更改所有嵌入 - TF Bert Hugging Face
- flutter - 如何在 Flutter 中退出键盘后更改小部件
- java - FXMLLoader 找不到 JavaFX 类,但可以手动创建 JavaFX 作品
- java - 试图用 Intent 在 Android Studio 中解释一个 Java 方法
- node.js - '无法获取 /api/items'
- config - tmux 在 window_status 显示窗格的所有 window_names
- acumatica - 使用语言向 dac 添加列
- windows - 如何使用 PowerShell 确认系统初始化是否完成?
- asp.net - 将值从文本框传递到操作链接