python - Twisted reactor 如何与基于试验的单元测试一起工作?
问题描述
我已经使用 Twisted 编写了一个 TCP/UDP 拦截代理,我想向它添加一些单元测试。我想设置一个回显协议,然后通过我的代理发送一些数据,然后检查返回的响应。
然而,似乎即使对于使用套接字(撇开我的拦截代理)连接到回声器的简单测试,反应器似乎也没有在之后产生setUp
- 测试永远挂起。如果我向套接字添加超时,则会引发超时异常。我什至尝试连接ncat
以确保不是手动创建的套接字的责任 - 回声器确实在监听,但我没有收到回显给ncat
客户端的数据。
我使用的测试代码如下
import pytest
import socket
from twisted.trial import unittest
from twisted.internet import reactor, protocol
class EchoTCP(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
class EchoTCPFactory(protocol.Factory):
protocol = EchoTCP
class TestTCP(unittest.TestCase):
"""Twisted has its own unittest class
https://twistedmatrix.com/documents/15.2.0/core/howto/trial.html
"""
def setUp(self):
self.iface = "127.0.0.1"
self.data = b"Hello, World!"
# Setup twised echoer
self.port = reactor.listenTCP(
8080,
EchoTCPFactory(),
interface=self.iface
)
def tearDown(self):
self.port.stopListening()
def test_echo(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.iface, self.port.getHost().port))
sent = sock.send(self.data)
data = sock.recv(1024)
sock.close()
assert data == self.data
要运行它,我使用以下命令
PYTHONPATH="${PWD}" trial --reactor=default mymodule
输出如下并保持这种状态,直到我终止进程
mymodule.test.test_network
TestTCP
test_echo ...
似乎我错过了有关反应堆如何工作的一些东西。我一直在寻找类似的例子,但无法让它发挥作用。
我应该如何编写测试以获得预期的行为?
解决方案
事实证明,我必须Deffered
使用inlineCallbacks运行测试方法,以便在反应器运行时调用它们。为了测试这种行为,我使用了以下代码片段
from twisted.internet.defer import inlineCallbacks
# [...]
def check_reactor(self):
# time.sleep(100)
return reactor.running
@inlineCallbacks
def test_reactor(self):
reactor_running = yield threads.deferToThread(self.check_reactor)
assert reactor_running == True
...这使得测试成功完成
mymodule.test.test_network
TestTCP
test_reactor ... [OK]
-------------------------------------------------------------------------------
Ran 1 tests in 0.007s
PASSED (successes=1)
如果我在回调方法中启用 sleep(100) 并ncat
在该时间跨度内连接,我发送到侦听端口的数据确实会回显
推荐阅读
- spring - 如何在通量流中获得实际价值,而不是 FluxLift?
- c - 增加维度时的 LDL' 分解错误
- firebase - 使用具有自定义身份验证安全性的 Firebase Cloud Firestore(无 Firebase 身份验证)
- .net - 按下更新异步 intill 键
- python - Python随机频率
- strapi - 如何从 MongoDB 中的 URL 填充 Strapi 中的图像?
- javascript - 滚动行为是否来自 jquery?
- javascript - 使用 pdf.js 以高质量呈现 pdf
- javascript - 压缩母版页 Adobe Forms SAP 中隐藏字段的空间
- mysql - 如何在mysql中使用mysql列作为unix_timestamp