首页 > 解决方案 > python websockets - 如何做一个简单的同步发送命令?

问题描述

我是 websockets 的新手。我一直在使用 websockets 文档的入门页面上的示例,主要是同步示例。

在这个用例中,我在 localhost 上有一个 sqlite3 数据库。我从 localhost 上的 python GUI 程序编辑该数据库,该程序直接导入数据库代码层。然后客户端告诉 websocket 服务器向所有客户端发送一些提取的数据。

(最终这将在 LAN 上,服务器机器运行 Flask API。)

这是有效的,使用下面的代码,但不清楚我是否正确地做。基本上我想在发生某些数据库活动时发送 websockets 消息,并且我对如何在从代码调用时执行“简单”非异步发送感到困惑,最终响应 GUI 交互,而不是执行发送以响应传入的 websocket 消息。在伪代码中:

def send(ws,msg):
  ws.send(msg)

send(ws,'OK!')

我完成的方式是将执行发送的异步 def 包装在非异步“香草”def 中。

websocket服务器代码:

# modified from https://websockets.readthedocs.io/en/stable/intro.html#synchronization-example

import asyncio
import websockets

USERS = set()

async def register(websocket):
    print("register: "+str(websocket))
    USERS.add(websocket)

async def unregister(websocket):
    print("unregister: "+str(websocket))
    USERS.remove(websocket)

# each new connection calls trackerHandler, resulting in a new USERS entry
async def trackerHandler(websocket, path):
    await register(websocket)
    try:
        async for message in websocket:
            await asyncio.wait([user.send(message) for user in USERS])
    finally:
        await unregister(websocket)

start_server = websockets.serve(trackerHandler, "localhost", 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

在数据库接口代码中(在 localhost 上,这个文件只是直接导入到 GUI 应用程序中;但在 LAN 服务器上,这是在 Flask 中的 WSGI 调用中指定的文件):

import asyncio
import websockets
# uri = "ws://localhost:8765"

# wrap the asynchronous send function inside a synchronous function

def wsSend(uri,msg):
    async def send():
        async with websockets.connect(uri) as websocket:
            # await websocket.send(str.encode(str(msg)))
            await websocket.send(json.dumps({"msg":msg}))
            # print(f"> {msg}")

            # greeting = await websocket.recv()
            # print(f"< {greeting}")

    asyncio.get_event_loop().run_until_complete(send())

...
...

def tdbPushTables(uri,teamsViewList=None,assignmentsViewList=None,teamsCountText="---",assignmentsCountText="---"):
    # uri = "ws://localhost:8765"
    if not teamsViewList:
        teamsViewList=tdbGetTeamsView()
    if not assignmentsViewList:
        assignmentsViewList=tdbGetAssignmentsView()
    if uri=='pusher':
        pusher_client.trigger('my-channel','teamsViewUpdate',teamsViewList)
        pusher_client.trigger('my-channel','assignmentsViewUpdate',teamsViewList)
    else:
        wsSend(uri,json.dumps({
                "teamsView":teamsViewList,
                "assignmentsView":assignmentsViewList,
                "teamsCount":teamsCountText,
                "assignmentsCount":assignmentsCountText}))

实际上是客户端发起了对 tdbPushTables 的调用:

def buildLists(self):
    self.teamsList=tdbGetTeamsView()
    self.assignmentsList=tdbGetAssignmentsView()
    self.updateCounts()
    tdbPushTables('ws://localhost:8765',self.teamsList,self.assignmentsList,self.teamsCountText,self.assignmentsCountText)

感觉很诡异 是令人毛骨悚然还是这实际上是正确的方法?我应该为服务器使用 websockets 模块,而是使用不同的模块来将 websocket 消息“简单”/同步发送到服务器吗?

此解决方案的两个已知副作用:1)它在每次调用时打开和关闭 websocket 连接 - 可能不是真正的问题......?,以及 2)它会在服务器脚本中产生类似这样的非致命处理消息:

register: <websockets.server.WebSocketServerProtocol object at 0x041C46F0>
Task exception was never retrieved
future: <Task finished coro=<WebSocketCommonProtocol.send() done, defined at C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py:521> exception=ConnectionClosedOK('code = 1000 (OK), no reason')>
Traceback (most recent call last):
  File "C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py", line 555, in send
    await self.ensure_open()
  File "C:\Users\caver\AppData\Roaming\Python\Python37\site-packages\websockets\protocol.py", line 812, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedOK: code = 1000 (OK), no reason
unregister: <websockets.server.WebSocketServerProtocol object at 0x041C46F0>

编辑:看起来websocket(单数)模块有一个同步接口,websockets(复数)文档解释说,如果你想同步,你应该使用不同的模块;所以,这有效:

(而不是导入 asyncio 和 websockets)

from websocket import create_connection

def wsSend(uri,msg):
    ws=create_connection(uri)
    ws.send(json.dumps({"msg":msg}))
    ws.close()

每次调用 wsSend 时,它仍然会导致在服务器脚本中显示相同的已处理回溯;可能有一种方法可以使该回溯输出静音,但是无论如何,它似乎仍然没有影响任何事情。

标签: pythonwebsocket

解决方案


您的代码感觉很诡异,因为您将async代码与同步代码混合在一起。

根据个人经验,如果您将大部分代码保持异步,则代码更易于遵循。

结构将变为:

import asyncio
import websockets

async def main():
    # Create websocket connection
    async with websockets.connect(uri) as websocket:
        await your_function_that_does_some_processing(websocket)


asyncio.get_event_loop().run_until_complete(main())

请记住,大部分阻塞代码会产生问题。


推荐阅读