首页 > 解决方案 > 如何避免循环参数

问题描述

以下代码是我用 python 3.6 编写的一些自动化测试的一部分:

connected = False

def aiohttp_server(loop):
    async def handler(msg, session):
        global connected
        if msg.type == sockjs.MSG_OPEN:
            connected = True
        if msg.type == sockjs.MSG_CLOSE:
            connected = False

    app = web.Application(loop=loop)
    sockjs.add_endpoint(app, handler)
    runner = web.AppRunner(app)
    return runner

def run_server(runner, loop):
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)s %(message)s')

    asyncio.set_event_loop(loop)
    loop.run_until_complete(runner.setup())
    site = web.TCPSite(runner, 'localhost', 8080)
    loop.run_until_complete(site.start())
    loop.run_forever()

def start_server():
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=run_server, args=(aiohttp_server(loop),loop,), daemon=True)
    t.start()
    time.sleep(0.01)

基本上,调用 start_server 应该启动一个简单的 Web 服务器,其中包含一个名为/sockjs

我还不是pythonasync关键字的大师。有两个问题,我怀疑是相关的:

首先,我收到了关于app = web.Application(loop=loop)声明的弃用警告:

/home/peter/incubator/sockjs_client/tests/test_sockjs_client.py:25: DeprecationWarning: loop argument is deprecated
  app = web.Application(loop=loop)
/home/peter/.local/lib/python3.6/site-packages/sockjs/route.py:54: DeprecationWarning: loop property is deprecated
  manager = SessionManager(name, app, handler, app.loop)

其次,测试偶尔会失败。我相信,根据机器负载,有时服务器在测试代码实际开始执行之前没有足够的时间启动。

基本上,我需要该start_server函数使用 websocket 端点初始化 web 应用程序,并且在应用程序准备好接受 websocket 连接之前不返回。

标签: websocketautomated-testspython-asyncio

解决方案


首先,我在 app =web.Application(loop=loop)语句上收到弃用警告:

避免loop到处走动的推荐方法是切换到asyncio.run. 与其手动管理循环,asyncio.run不如为您创建(并关闭)循环。如果您的所有工作都在协程中完成,您可以使用get_event_loop()或访问循环get_running_loop()

基本上,我需要 start_server 函数使用 websocket 端点初始化 web 应用程序,并且在应用程序准备好接受 websocket 连接之前不返回。

您可以将 a 传递threading.Event给设置站点时设置的线程,然后在主线程中等待它。

这是一个实现这两个建议的(未经测试的)示例:

connected = False

def aiohttp_server():
    async def handler(msg, session):
        global connected
        if msg.type == sockjs.MSG_OPEN:
            connected = True
        if msg.type == sockjs.MSG_CLOSE:
            connected = False

    app = web.Application()
    sockjs.add_endpoint(app, handler)
    return web.AppRunner(app)

async def run_server(ready):
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)s %(levelname)s %(message)s')
    runner = aiohttp_server()
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    ready.set()
    # emulates loop.run_forever()
    await asyncio.get_running_loop().create_future()

def start_server():
    ready = threading.Event()
    threading.Thread(target=asyncio.run, args=(aiohttp_server(ready),),
                     daemon=True).start()
    ready.wait()

推荐阅读