首页 > 解决方案 > 有没有比这更好的方法来混合 websocket recv 和发送调用?

问题描述

目前,我正在使用asyncio.wait_for轮询websocket.recv,以便websocket.send在需要时也可以调用:

async def client_reader():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        try:
            # Is there a better way to recv from a websocket?
            result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
            if result.endswith('\n'):
                result = result.rstrip()
                print(f"result = {result}")
                result = ''
            # I need to do other asyncio here
            await some_other_work()
        except asyncio.TimeoutError:
            pass

我只能找到使用过的玩具示例asyncio的所有文档。websockets有一个更好的方法吗?

这是一个模拟我正在做的事情的独立程序:

import asyncio
import websockets
import random
import multiprocessing
import time

PORT = 49152

text = """\
This is a bunch of text that will be used to
simulate a server sending multiple lines of
text to a client with a random amount of delay
between each line.
"""

def get_next_line():
    text_list = text.split('\n')
    while True:
        for line in text_list:
            yield line + '\n'

line_generator = get_next_line()

async def delay_server(websocket, path):
    while True:
        await asyncio.sleep(random.random() * 5.0)
        line = next(line_generator)
        await websocket.send(line)

def server_func():
    try:
        start_server = websockets.serve(delay_server, "localhost", PORT)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
    except KeyboardInterrupt:
        pass

async def some_other_work():
    print("I occasionally need to call websocket.send() here")

async def client_reader():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        try:
            result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
            if result.endswith('\n'):
                result = result.rstrip()
                print(f"result = {result}")
                result = ''
            await some_other_work()
        except asyncio.TimeoutError:
            pass

def client_func():
    try:
        asyncio.run(client_reader())
    except KeyboardInterrupt:
        pass

server_proc = multiprocessing.Process(target=server_func)
server_proc.daemon = True
server_proc.start()
client_proc = multiprocessing.Process(target=client_func)
client_proc.daemon = True
client_proc.start()
try:
    while True:
        time.sleep(1.0)
except KeyboardInterrupt:
    pass
server_proc.join()
client_proc.join()

标签: pythonwebsocketpython-asyncio

解决方案


我将代码更改为如下所示:

async def client_reader(websocket, result):
    try:
        result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
        if result.endswith('\n'):
            result = result.rstrip()
            print(f"result = {result}")
            result = ''
    except asyncio.TimeoutError:
        pass
    return result

async def if_flag_send_message():
    print("if a flag is set, I will call websocket.send here")

async def client_writer(websocket):
    await if_flag_send_message()
    await asyncio.sleep(1.0)

async def client_handler():
    websocket = await websockets.connect(f"ws://localhost:{PORT}")
    result = ''
    while True:
        reader_task = asyncio.create_task(client_reader(websocket, result))
        writer_task = asyncio.create_task(client_writer(websocket))
        await asyncio.gather(reader_task, writer_task)
        result = reader_task.result()

我仍然不确定这是做事的正确方法,但它确实有效。


推荐阅读