python - 如果我使用缓存的 gRPC 通道,pytest 会失败,仅在多个测试中
问题描述
概括
我正在制作与其他微服务服务器交互的客户端服务。
通过使用 gRPC 通道,该客户端从服务器获取一些数据。连接使用频繁,参与者固定,所以我重用 gRPC Channel 和 Stub 以降低创建通道的成本。
该服务对每个请求都执行良好,并且每个测试一次单独运行。然而,在多次测试中,我发现只有一个测试成功,而另一个会因TimeOutError
(gRPC status - DEADLINE_EXCEEDED
) 而失败或停止。
@lru_cache
有趣的是,当我删除通道缓存( )或event_loop
为会话(或模块)范围添加 pytest 夹具覆盖时,这个问题得到了解决。我在这个问题/答案中找到了第二种方法。
为什么会发生这种情况?是什么让我的测试停止或失败?我猜它与事件循环有关,但不知道细节。
最小可重现示例(MRE)
# mre.py
from functools import lru_cache
from grpclib.client import Channel
from config.config import current_config
from custom.protobuf.service import OtherServiceStub
@lru_cache(maxsize=None)
def get_cached_client() -> OtherServiceStub:
host, port = '127.0.0.1', 50051
channel = Channel(host, port)
cached_client = OtherServiceStub(channel)
return cached_client
async def get_data(arg1: str = None):
client = get_cached_client()
data = client.get_detail_data(arg1='foo')
return data
# test_mre.py
@pytest.mark.asyncio
async def test_1(): # SUCCEED
client = get_cached_client()
await client.get_data(arg1='foo')
@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
client = get_cached_client()
await client.get_data(arg1='bar')
@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
client = get_cached_client()
await client.get_data(arg1='something')
# solved if(1)
# not cached
def get_cached_client() -> OtherServiceStub:
host, port = '127.0.0.1', 50051
channel = Channel(host, port)
cached_client = OtherServiceStub(channel)
return cached_client
# solved if(2)
# override event_loop fixture
@pytest.fixture(scope="session")
def event_loop(request):
"""Create an instance of the default event loop for each test case."""
loop = asyncio.get_event_loop_policy().new_event_loop()
yield loop
loop.close()
环境
pytest==6.2.4
pytest-asyncio==0.15.1
grpcio==1.37.1
grpcio-tools==1.37.1
grpclib==0.4.1
protobuf==3.15.8
betterproto==1.2.5
解决方案
我发现这个问题源于 gRPC Channel 和 pytest 事件循环夹具的实现。
lru_cache
如果调用相同的函数,则返回缓存结果。这,“相同”意味着如果函数由相同的输入(参数)调用。询问的缓存函数没有参数,因此如果您调用该函数,您将获得与前一次调用完全相同的结果,除了第一次调用。因此,您在测试代码中的 grpc 通道都是完全相同的通道。
# test_mre.py
@pytest.mark.asyncio
async def test_1():
client = get_cached_client() # FIRST CALL - Create Channel & Stub object in here
await client.get_data(arg1='foo')
@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
await client.get_data(arg1='bar')
@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
await client.get_data(arg1='something')
那么,为什么复用的通道不能正常使用呢?问题出在 pytest-asyncio 装饰器中。
@pytest.mark.asyncio
当函数针对它所应用的每个函数完成时,创建新的事件循环并关闭。默认的事件循环范围是function
. 您可以在 pytest-asyncio 中的事件循环夹具的实现中看到这一点。
Python gRPC Channel 对象注册它创建的事件循环环境,并且当该事件循环关闭时 Channel 关闭。在问的例子中,它是test_1
函数 event loop。当您调用同一个通道并尝试在test_2
函数中使用它时,test_1
事件循环已经关闭,因此通道已关闭(running=False
, closed=True
)。这意味着await
请求不会永远得到响应。
@pytest.mark.asyncio
async def test_1():
client = get_cached_client()
await client.get_data(arg1='foo')
# At this point, event loop and the channel is closed.
@pytest.mark.asyncio
async def test_2():
client = get_cached_client() # Calling closed channel
await client.get_data(arg1='bar')
@pytest.mark.asyncio
async def test_3():
client = get_cached_client() # Calling closed channel
await client.get_data(arg1='something')
所以这就是为什么第一个测试成功但其他测试失败的原因。只有在第一个事件循环中,通道是活动的。如果您设置了timeout
参数,那么测试将失败,因为您无法在超时限制内从 gRPC 服务器获得响应(无论多么足够)。如果没有,您会看到所有其他测试都已停止,因为 python gRPC 通道没有默认超时限制。
您的两个解决方案可以解决此问题。首先,如果 Channel 对象没有被缓存,那么每个测试函数都会创建自己的通道,并且事件循环问题被清除。其次,如果您在会话范围内设置默认事件循环,您可以在所有测试函数中重用您的默认事件循环夹具。所以 Channel 对象不会被关闭(因为它的事件循环没有关闭)。
推荐阅读
- java - 为什么在构造函数中调用此函数会产生 stackoverflow 错误?
- r - 不使用 Shiny 的链接图
- android - 在 MPAndroidChart 中的活动之间传递数据集
- cordova - 如何在cordova phonegap中为Windows应用程序设置固定窗口大小
- python - 从网页中的表格获取链接
- python - 如何强制 pandas .loc 返回系列
- javascript - React Redux 操作未调度但请求成功
- wso2 - 在 DAS 中部署 ESB 分析车
- angular - Angular PWA 离线数据处理
- c++ - 我包含哪个标头(cstddef、cstdio、cstdlib 等)以获得 size_t 的定义是否重要?