python - python websockets - 如何做一个简单的同步发送命令?
问题描述
我是 websockets 的新手。我一直在使用 websockets 文档的入门页面上的示例,主要是同步示例。
在这个用例中,我在 localhost 上有一个 sqlite3 数据库。我从 localhost 上的 python GUI 程序编辑该数据库,该程序直接导入数据库代码层。然后客户端告诉 websocket 服务器向所有客户端发送一些提取的数据。
(最终这将在 LAN 上,服务器机器运行 Flask API。)
这是有效的,使用下面的代码,但不清楚我是否正确地做。基本上我想在发生某些数据库活动时发送 websockets 消息,并且我对如何在从代码调用时执行“简单”非异步发送感到困惑,最终响应 GUI 交互,而不是执行发送以响应传入的 websocket 消息。在伪代码中:
def send(ws,msg):
ws.send(msg)
send(ws,'OK!')
我完成的方式是将执行发送的异步 def 包装在非异步“香草”def 中。
websocket服务器代码:
# modified from https://websockets.readthedocs.io/en/stable/intro.html#synchronization-example
import asyncio
import websockets
USERS = set()
async def register(websocket):
print("register: "+str(websocket))
USERS.add(websocket)
async def unregister(websocket):
print("unregister: "+str(websocket))
USERS.remove(websocket)
# each new connection calls trackerHandler, resulting in a new USERS entry
async def trackerHandler(websocket, path):
await register(websocket)
try:
async for message in websocket:
await asyncio.wait([user.send(message) for user in USERS])
finally:
await unregister(websocket)
start_server = websockets.serve(trackerHandler, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
在数据库接口代码中(在 localhost 上,这个文件只是直接导入到 GUI 应用程序中;但在 LAN 服务器上,这是在 Flask 中的 WSGI 调用中指定的文件):
import asyncio
import websockets
# uri = "ws://localhost:8765"
# wrap the asynchronous send function inside a synchronous function
def wsSend(uri,msg):
async def send():
async with websockets.connect(uri) as websocket:
# await websocket.send(str.encode(str(msg)))
await websocket.send(json.dumps({"msg":msg}))
# print(f"> {msg}")
# greeting = await websocket.recv()
# print(f"< {greeting}")
asyncio.get_event_loop().run_until_complete(send())
...
...
def tdbPushTables(uri,teamsViewList=None,assignmentsViewList=None,teamsCountText="---",assignmentsCountText="---"):
# uri = "ws://localhost:8765"
if not teamsViewList:
teamsViewList=tdbGetTeamsView()
if not assignmentsViewList:
assignmentsViewList=tdbGetAssignmentsView()
if uri=='pusher':
pusher_client.trigger('my-channel','teamsViewUpdate',teamsViewList)
pusher_client.trigger('my-channel','assignmentsViewUpdate',teamsViewList)
else:
wsSend(uri,json.dumps({
"teamsView":teamsViewList,
"assignmentsView":assignmentsViewList,
"teamsCount":teamsCountText,
"assignmentsCount":assignmentsCountText}))
实际上是客户端发起了对 tdbPushTables 的调用:
def buildLists(self):
self.teamsList=tdbGetTeamsView()
self.assignmentsList=tdbGetAssignmentsView()
self.updateCounts()
tdbPushTables('ws://localhost:8765',self.teamsList,self.assignmentsList,self.teamsCountText,self.assignmentsCountText)
感觉很诡异 是令人毛骨悚然还是这实际上是正确的方法?我应该为服务器使用 websockets 模块,而是使用不同的模块来将 websocket 消息“简单”/同步发送到服务器吗?
此解决方案的两个已知副作用:1)它在每次调用时打开和关闭 websocket 连接 - 可能不是真正的问题......?,以及 2)它会在服务器脚本中产生类似这样的非致命处理消息:
register: <websockets.server.WebSocketServerProtocol object at 0x041C46F0>
Task exception was never retrieved
future: <Task finished coro=<WebSocketCommonProtocol.send() done, defined at C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py:521> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>
Traceback (most recent call last):
File "C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py", line 555, in send
await self.ensure_open()
File "C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py", line 812, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason
unregister: <websockets.server.WebSocketServerProtocol object at 0x041C46F0>
编辑:看起来websocket(单数)模块有一个同步接口,websockets(复数)文档解释说,如果你想同步,你应该使用不同的模块;所以,这有效:
(而不是导入 asyncio 和 websockets)
from websocket import create_connection
def wsSend(uri,msg):
ws=create_connection(uri)
ws.send(json.dumps({"msg":msg}))
ws.close()
每次调用 wsSend 时,它仍然会导致在服务器脚本中显示相同的已处理回溯;可能有一种方法可以使该回溯输出静音,但是无论如何,它似乎仍然没有影响任何事情。
解决方案
您的代码感觉很诡异,因为您将async
代码与同步代码混合在一起。
根据个人经验,如果您将大部分代码保持异步,则代码更易于遵循。
结构将变为:
import asyncio
import websockets
async def main():
# Create websocket connection
async with websockets.connect(uri) as websocket:
await your_function_that_does_some_processing(websocket)
asyncio.get_event_loop().run_until_complete(main())
请记住,大部分阻塞代码会产生问题。
推荐阅读
- python-3.x - 如何从 Robotframework 调用 python 脚本作为 Teardown 的一部分
- apache-spark - 如何使用 REST API 从 Spark 历史服务器获取查询执行计划?
- java - 为什么 permitAll() 返回 403 spring security?
- java - 使用 ManyToOne 关联查询导致 SQLException
- jquery - 如何使用 jquery 更改嵌套 span 的值?
- swiftui - Swiftui UIViewRepresentable 表格视图在滚动时具有填充
- javascript - 无法在 Quasar 应用程序的 Vue.js 组件上的“this.$attrs”上定义类型
- objective-c - 如何在Objective C中执行从小部件到父应用程序的深层链接
- elasticsearch - 弹性搜索结果不一致——查询多个分片
- amazon-web-services - Terraform:导入 aws 资源时出现凭证错误 - 调用 sts 时出错:GetCallerIdentity:ExpiredToken