python-3.x - Asyncio 测试一个使用 loop.call_soon_threadsafe 的函数
问题描述
我有以下功能:
def send_command(self, cmd):
self.loop.call_soon_threadsafe(
functools.partial(
self._transport.write, str(cmd).encode() + b"\n"
)
)
被测系统 (sut) 是一个继承自asyncio.Protocol
它的类,它向套接字上的硬件发送一些命令。我必须使用线程,因为这是 wxPython 下 GUI 的一部分。最后,如果我调用self._transport.write
代码在 Linux 上运行良好,但在 Windows™ 上崩溃。
运行测试时:
@pytest.mark.asyncio
async def test_send_command(self):
self.sut._transport = Mock()
self.sut.send_command("ook eek")
assert self.sut._transport.write.called is True
我得到一个断言错误。self.sut._transport.write
永远不会被调用。如果我self._transport.write
直接在函数中调用,代码会在 Windows™ 上崩溃,但测试通过就好了。
我在这里想念什么?
任何人?…
当然,这不是一个极端情况……</p>
解决方案
解决方法……</h1>
在阅读并尝试了 even loops之后,使用这个:
import asyncio
import selectors
selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)
绕过问题。当然,这意味着在 Windows 上使用效率较低的循环。☹PHA!
欢迎任何有更好解决方案的人访问一些虚假的互联网点。
推荐阅读
- linux - Linux 将文本附加到 ifstat 输出
- wordpress - 使用 ACF load_value 挂钩预填充上传文件字段
- javascript - 刷新时加载回主页
- javascript - 克隆/复制对象时如何删除对象的只读属性?
- javascript - javascript中的警报错误。显示字符串的问题
- python - 如何使用 Jinja 将日期字符串传递到 SQL 查询中
- python - 无法使用 pip 在 Ubuntu 20.04 LTS 上安装 Tensorflow 1.x
- javascript - Throw er:如果端口和地址正确,为什么会抛出这个?
- sqlite - sqlite 和 better-sqlite3 实现之间的区别
- c# - 如何在 C# 中连接查询结果