websocket - 如何避免循环参数
问题描述
以下代码是我用 python 3.6 编写的一些自动化测试的一部分:
connected = False
def aiohttp_server(loop):
async def handler(msg, session):
global connected
if msg.type == sockjs.MSG_OPEN:
connected = True
if msg.type == sockjs.MSG_CLOSE:
connected = False
app = web.Application(loop=loop)
sockjs.add_endpoint(app, handler)
runner = web.AppRunner(app)
return runner
def run_server(runner, loop):
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
asyncio.set_event_loop(loop)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, 'localhost', 8080)
loop.run_until_complete(site.start())
loop.run_forever()
def start_server():
loop = asyncio.new_event_loop()
t = threading.Thread(target=run_server, args=(aiohttp_server(loop),loop,), daemon=True)
t.start()
time.sleep(0.01)
基本上,调用 start_server 应该启动一个简单的 Web 服务器,其中包含一个名为/sockjs
我还不是pythonasync
关键字的大师。有两个问题,我怀疑是相关的:
首先,我收到了关于app = web.Application(loop=loop)
声明的弃用警告:
/home/peter/incubator/sockjs_client/tests/test_sockjs_client.py:25: DeprecationWarning: loop argument is deprecated
app = web.Application(loop=loop)
/home/peter/.local/lib/python3.6/site-packages/sockjs/route.py:54: DeprecationWarning: loop property is deprecated
manager = SessionManager(name, app, handler, app.loop)
其次,测试偶尔会失败。我相信,根据机器负载,有时服务器在测试代码实际开始执行之前没有足够的时间启动。
基本上,我需要该start_server
函数使用 websocket 端点初始化 web 应用程序,并且在应用程序准备好接受 websocket 连接之前不返回。
解决方案
首先,我在 app =
web.Application(loop=loop)
语句上收到弃用警告:
避免loop
到处走动的推荐方法是切换到asyncio.run
. 与其手动管理循环,asyncio.run
不如为您创建(并关闭)循环。如果您的所有工作都在协程中完成,您可以使用get_event_loop()
或访问循环get_running_loop()
。
基本上,我需要 start_server 函数使用 websocket 端点初始化 web 应用程序,并且在应用程序准备好接受 websocket 连接之前不返回。
您可以将 a 传递threading.Event
给设置站点时设置的线程,然后在主线程中等待它。
这是一个实现这两个建议的(未经测试的)示例:
connected = False
def aiohttp_server():
async def handler(msg, session):
global connected
if msg.type == sockjs.MSG_OPEN:
connected = True
if msg.type == sockjs.MSG_CLOSE:
connected = False
app = web.Application()
sockjs.add_endpoint(app, handler)
return web.AppRunner(app)
async def run_server(ready):
logging.basicConfig(level=logging.DEBUG,
format='%(asctime)s %(levelname)s %(message)s')
runner = aiohttp_server()
await runner.setup()
site = web.TCPSite(runner, 'localhost', 8080)
await site.start()
ready.set()
# emulates loop.run_forever()
await asyncio.get_running_loop().create_future()
def start_server():
ready = threading.Event()
threading.Thread(target=asyncio.run, args=(aiohttp_server(ready),),
daemon=True).start()
ready.wait()
推荐阅读
- c++ - 如何获取代码以使用 PayRoll Datatype intemplate 进行编译?适用于整数
- javascript - 如何使用具有关联数组的 API 填写表单?
- docker - WebDriverError:无效的会话 ID
- r - R 没有给出正确的周数
- google-cloud-platform - google云控制台保存的ocr翻译文本在哪里
- ruby-on-rails - Model.group(:id) 抛出错误“选择列表不在 GROUP BY 子句中包含非聚合列“id”
- google-cloud-sql - 谷歌云sql不正确innodb_buffer_pool_size?
- google-cloud-pubsub - Google pubsub_v1 订阅者拉“打开的文件太多”
- jquery - 在 HTML 文件中将变量从 Django 传递给 Jquery
- opencv - 即使在 conda env 中安装 Opencv 也找不到错误