首页 > 解决方案 > Asyncio 测试一个使用 loop.call_soon_threadsafe 的函数

问题描述

我有以下功能:

def send_command(self, cmd):
    self.loop.call_soon_threadsafe(
        functools.partial(
            self._transport.write, str(cmd).encode() + b"\n"
        )
    )

被测系统 (sut) 是一个继承自asyncio.Protocol它的类,它向套接字上的硬件发送一些命令。我必须使用线程,因为这是 wxPython 下 GUI 的一部分。最后,如果我调用self._transport.write代码在 Linux 上运行良好,但在 Windows™ 上崩溃。

运行测试时:

@pytest.mark.asyncio
async def test_send_command(self):
    self.sut._transport = Mock()
    self.sut.send_command("ook eek")
    assert self.sut._transport.write.called is True

我得到一个断言错误。self.sut._transport.write永远不会被调用。如果我self._transport.write直接在函数中调用,代码会在 Windows™ 上崩溃,但测试通过就好了。

我在这里想念什么?

任何人?…

当然,这不是一个极端情况……</p>

标签: python-3.xlinuxwindowspython-asynciopython-multithreading

解决方案


解决方法……</h1>

阅读并尝试了 even loops之后,使用这个:

import asyncio
import selectors

selector = selectors.SelectSelector()
loop = asyncio.SelectorEventLoop(selector)
asyncio.set_event_loop(loop)

绕过问题。当然,这意味着在 Windows 上使用效率较低的循环。☹PHA!

欢迎任何有更好解决方案的人访问一些虚假的互联网点。


推荐阅读