首页 > 解决方案 > 如果我使用缓存的 gRPC 通道,pytest 会失败,仅在多个测试中

问题描述

概括

我正在制作与其他微服务服务器交互的客户端服务。

通过使用 gRPC 通道,该客户端从服务器获取一些数据。连接使用频繁,参与者固定,所以我重用 gRPC Channel 和 Stub 以降低创建通道的成本。

该服务对每个请求都执行良好,并且每个测试一次单独运行。然而,在多次测试中,我发现只有一个测试成功,而另一个会因TimeOutError(gRPC status - DEADLINE_EXCEEDED) 而失败或停止。

@lru_cache有趣的是,当我删除通道缓存( )或event_loop为会话(或模块)范围添加 pytest 夹具覆盖时,这个问题得到了解决。我在这个问题/答案中找到了第二种方法。

为什么会发生这种情况?是什么让我的测试停止或失败?我猜它与事件循环有关,但不知道细节。


最小可重现示例(MRE)

# mre.py
from functools import lru_cache

from grpclib.client import Channel

from config.config import current_config
from custom.protobuf.service import OtherServiceStub


@lru_cache(maxsize=None)
def get_cached_client() -> OtherServiceStub:
    host, port = '127.0.0.1', 50051
    channel = Channel(host, port)
    cached_client = OtherServiceStub(channel)
    return cached_client

async def get_data(arg1: str = None):
    client = get_cached_client()
    data = client.get_detail_data(arg1='foo')
    return data
# test_mre.py

@pytest.mark.asyncio
async def test_1(): # SUCCEED
    client = get_cached_client()
    await client.get_data(arg1='foo')


@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
    client = get_cached_client()
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
    client = get_cached_client()
    await client.get_data(arg1='something')
# solved if(1)
# not cached
def get_cached_client() -> OtherServiceStub:
    host, port = '127.0.0.1', 50051
    channel = Channel(host, port)
    cached_client = OtherServiceStub(channel)
    return cached_client

# solved if(2)
# override event_loop fixture
@pytest.fixture(scope="session")
def event_loop(request):
    """Create an instance of the default event loop for each test case."""
    loop = asyncio.get_event_loop_policy().new_event_loop()
    yield loop
    loop.close()

环境

pytest==6.2.4
pytest-asyncio==0.15.1
grpcio==1.37.1
grpcio-tools==1.37.1
grpclib==0.4.1
protobuf==3.15.8
betterproto==1.2.5

标签: pythoncachingpytestgrpcgrpc-python

解决方案


我发现这个问题源于 gRPC Channel 和 pytest 事件循环夹具的实现。

lru_cache如果调用相同的函数,则返回缓存结果。这,“相同”意味着如果函数由相同的输入(参数)调用。询问的缓存函数没有参数,因此如果您调用该函数,您将获得与前一次调用完全相同的结果,除了第一次调用。因此,您在测试代码中的 grpc 通道都是完全相同的通道。

# test_mre.py

@pytest.mark.asyncio
async def test_1(): 
    client = get_cached_client() # FIRST CALL - Create Channel & Stub object in here
    await client.get_data(arg1='foo')


@pytest.mark.asyncio
async def test_2(): # FAIL(or STOP)
    client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): # FAIL(or STOP)
    client = get_cached_client() # NOT FIRST CALL - Reuse object which is created in test_1
    await client.get_data(arg1='something')

那么,为什么复用的通道不能正常使用呢?问题出在 pytest-asyncio 装饰器中。

@pytest.mark.asyncio当函数针对它所应用的每个函数完成时,创建新的事件循环并关闭。默认的事件循环范围是function. 您可以在 pytest-asyncio 中的事件循环夹具的实现中看到这一点。

Python gRPC Channel 对象注册它创建的事件循环环境,并且当该事件循环关闭时 Channel 关闭。在问的例子中,它是test_1函数 event loop。当您调用同一个通道并尝试在test_2函数中使用它时,test_1事件循环已经关闭,因此通道已关闭(running=False, closed=True)。这意味着await请求不会永远得到响应。

@pytest.mark.asyncio
async def test_1(): 
    client = get_cached_client()
    await client.get_data(arg1='foo')
    # At this point, event loop and the channel is closed.


@pytest.mark.asyncio
async def test_2(): 
    client = get_cached_client() # Calling closed channel
    await client.get_data(arg1='bar')

@pytest.mark.asyncio
async def test_3(): 
    client = get_cached_client() # Calling closed channel
    await client.get_data(arg1='something')

所以这就是为什么第一个测试成功但其他测试失败的原因。只有在第一个事件循环中,通道是活动的。如果您设置了timeout参数,那么测试将失败,因为您无法在超时限制内从 gRPC 服务器获得响应(无论多么足够)。如果没有,您会看到所有其他测试都已停止,因为 python gRPC 通道没有默认超时限制。

您的两个解决方案可以解决此问题。首先,如果 Channel 对象没有被缓存,那么每个测试函数都会创建自己的通道,并且事件循环问题被清除。其次,如果您在会话范围内设置默认事件循环,您可以在所有测试函数中重用您的默认事件循环夹具。所以 Channel 对象不会被关闭(因为它的事件循环没有关闭)。


推荐阅读