python - 如何将 FileTransport 干净地添加到 Asyncio?
问题描述
我正在编写一个读取文本数据并对其进行操作的应用程序。文本数据可能来自 TCP 端口,或来自文本文件(其中包含先前从 TCP 端口读取并存档的数据)。我正在用 Python 3 编写它,使用asyncio似乎是显而易见的工具。
使用Streams APIopen_connection()
打开 TCP 端口并从中读取非常简单。asyncio架构具有传输和协议的概念,用于输入输出的下层和上层。所以,看来我应该实现一个传输来从文件中读取文本,并将其传递给协议。这将使我的应用程序的其余部分与文本数据是来自 TCP 端口还是来自文件分离。
但是我很难弄清楚如何告诉asyncio使用我喜欢的 Transport。
- Streams API
open_connection()
有一个关于 TCP 端口传输的参数列表,无法指定不同的传输,更不用说像文件路径这样的参数了。 open_connection()
转身打电话loop.create_connection()
。这就像专门用于 TCP 端口传输一样。现在仍然提供不同的传输方式。- 的实现从或
loop.create_connection()
获取其 Transport 对象。这些在和中具有替代实现,因此我们显然已经过了应该选择文件传输的地步。self._make_ssl_transport()
self._make_socket_transport()
asyncio.selector_events.BaseSelectorEventLoop
asyncio.proactor_events.BaseProactorEventLoop
我是否错过了asyncio让我告诉它使用什么 Transport 的地方?或者asyncio真的编码到它的根源以使用它自己的 TCP 端口和 UDP 数据报传输,而没有别的?
如果我想允许使用我自己的 Transport 和asyncio的可能性,看起来我必须扩展事件循环,或者编写更灵活的替代方案create_connection()
,编码为特定的事件循环实现。这似乎是很多工作,并且容易受到实施变化的影响。
或者,使用 Transport 处理文件输入是一件愚蠢的事吗?我是否应该构建我的代码来表示:
if (using_tcp_port):
await asyncio.open_connection(....)
else:
completely_different_file_implementation(....)
解决方案
根据API的文档,它采用协议并创建流传输,即TCP连接。所以它不应该是自定义传输的 API。 create_connection()
但是,为 TCP 传输或自定义文件传输重用相同协议的想法是有效的。它不会是“完全不同的实现”,但至少不会使用create_connection()
. 假设它是read_file()
:
def my_protocol_factory():
return your_protocol
if using_tcp_port:
transport, protocol = await loop.create_connection(my_protocol_factory, host, port)
else:
transport, protocol = await read_file(loop, my_protocol_factory, path_to_file)
然后你会有这样的事情:
from asyncio import transports
import aiofiles # https://github.com/Tinche/aiofiles
def read_file(loop, protocol_factory, path):
protocol = protocol_factory()
transport = FileTransport(path, loop)
transport.set_protocol(protocol)
return transport, protocol
class FileTransport(transports.ReadTransport):
def __init__(self, path, loop):
super().__init__()
self._path = path
self._loop = loop
self._closing = False
def is_closing(self):
return self._closing
def close(self):
self._closing = True
def set_protocol(self, protocol):
self._protocol = protocol
self._loop.create_task(self._do_read())
def get_protocol(self):
return self._protocol
async def _do_read(self):
try:
async with aiofiles.open(self._path) as f:
self._loop.call_soon(self._protocol.connection_made, self)
async for line in f:
self._loop.call_soon(self._protocol.data_received, line)
if self._closing:
break
self._loop.call_soon(self._protocol.eof_received)
except Exception as ex:
self._loop.call_soon(self._protocol.connection_lost, ex)
else:
self._loop.call_soon(self._protocol.connection_lost, None)
推荐阅读
- python - pandas - 将一列转换为三列
- javascript - 如何在特定时间获取音频源的 FFT
- html - HTML 输出加载和显示滚动条的时间过长
- magento - 在页面加载时获取数据库查询日志
- react-native - 如何使用 eslint 设置内联样式规则?
- discord.py - discord.py 如何使用 client.login 而不是 client.run
- r - 如何使列中的值成为具有相应值的单独列?
- python - python, mongo and marshmallow: datetime struggles
- sql - SQL Server 客户端字符串到(强制)整数转换问题
- python - 如何将我的模拟刷新到开始?