首页 > 解决方案 > 如何将 FileTransport 干净地添加到 Asyncio?

问题描述

我正在编写一个读取文本数据并对其进行操作的应用程序。文本数据可能来自 TCP 端口,或来自文本文件(其中包含先前从 TCP 端口读取并存档的数据)。我正在用 Python 3 编写它,使用asyncio似乎是显而易见的工具。

使用Streams APIopen_connection()打开 TCP 端口并从中读取非常简单。asyncio架构具有传输协议的概念,用于输入输出的下层和上层。所以,看来我应该实现一个传输来从文件中读取文本,并将其传递给协议。这将使我的应用程序的其余部分与文本数据是来自 TCP 端口还是来自文件分离。

但是我很难弄清楚如何告诉asyncio使用我喜欢的 Transport。

我是否错过了asyncio让我告诉它使用什么 Transport 的地方?或者asyncio真的编码到它的根源以使用它自己的 TCP 端口和 UDP 数据报传输,而没有别的?

如果我想允许使用我自己的 Transport 和asyncio的可能性,看起来我必须扩展事件循环,或者编写更灵活的替代方案create_connection(),编码为特定的事件循环实现。这似乎是很多工作,并且容易受到实施变化的影响。

或者,使用 Transport 处理文件输入是一件愚蠢的事吗?我是否应该构建我的代码来表示:

if (using_tcp_port): await asyncio.open_connection(....) else: completely_different_file_implementation(....)

标签: pythonpython-asyncio

解决方案


根据API的文档,它采用协议并创建传输,即TCP连接。所以它不应该是自定义传输的 API。 create_connection()

但是,为 TCP 传输或自定义文件传输重用相同协议的想法是有效的。它不会是“完全不同的实现”,但至少不会使用create_connection(). 假设它是read_file()

def my_protocol_factory():
    return your_protocol

if using_tcp_port:
    transport, protocol = await loop.create_connection(my_protocol_factory, host, port)
else:
    transport, protocol = await read_file(loop, my_protocol_factory, path_to_file)

然后你会有这样的事情:

from asyncio import transports

import aiofiles  # https://github.com/Tinche/aiofiles


def read_file(loop, protocol_factory, path):
    protocol = protocol_factory()
    transport = FileTransport(path, loop)
    transport.set_protocol(protocol)
    return transport, protocol


class FileTransport(transports.ReadTransport):
    def __init__(self, path, loop):
        super().__init__()
        self._path = path
        self._loop = loop
        self._closing = False

    def is_closing(self):
        return self._closing

    def close(self):
        self._closing = True

    def set_protocol(self, protocol):
        self._protocol = protocol
        self._loop.create_task(self._do_read())

    def get_protocol(self):
        return self._protocol

    async def _do_read(self):
        try:
            async with aiofiles.open(self._path) as f:
                self._loop.call_soon(self._protocol.connection_made, self)
                async for line in f:
                    self._loop.call_soon(self._protocol.data_received, line)
                    if self._closing:
                        break
                self._loop.call_soon(self._protocol.eof_received)
        except Exception as ex:
            self._loop.call_soon(self._protocol.connection_lost, ex)
        else:
            self._loop.call_soon(self._protocol.connection_lost, None)

推荐阅读