python - 有没有比这更好的方法来混合 websocket recv 和发送调用?
问题描述
目前,我正在使用asyncio.wait_for
轮询websocket.recv
,以便websocket.send
在需要时也可以调用:
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
# Is there a better way to recv from a websocket?
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
# I need to do other asyncio here
await some_other_work()
except asyncio.TimeoutError:
pass
我只能找到使用过的玩具示例asyncio
的所有文档。websockets
有一个更好的方法吗?
这是一个模拟我正在做的事情的独立程序:
import asyncio
import websockets
import random
import multiprocessing
import time
PORT = 49152
text = """\
This is a bunch of text that will be used to
simulate a server sending multiple lines of
text to a client with a random amount of delay
between each line.
"""
def get_next_line():
text_list = text.split('\n')
while True:
for line in text_list:
yield line + '\n'
line_generator = get_next_line()
async def delay_server(websocket, path):
while True:
await asyncio.sleep(random.random() * 5.0)
line = next(line_generator)
await websocket.send(line)
def server_func():
try:
start_server = websockets.serve(delay_server, "localhost", PORT)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
pass
async def some_other_work():
print("I occasionally need to call websocket.send() here")
async def client_reader():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
try:
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
await some_other_work()
except asyncio.TimeoutError:
pass
def client_func():
try:
asyncio.run(client_reader())
except KeyboardInterrupt:
pass
server_proc = multiprocessing.Process(target=server_func)
server_proc.daemon = True
server_proc.start()
client_proc = multiprocessing.Process(target=client_func)
client_proc.daemon = True
client_proc.start()
try:
while True:
time.sleep(1.0)
except KeyboardInterrupt:
pass
server_proc.join()
client_proc.join()
解决方案
我将代码更改为如下所示:
async def client_reader(websocket, result):
try:
result = result + await asyncio.wait_for(websocket.recv(), timeout=1.0)
if result.endswith('\n'):
result = result.rstrip()
print(f"result = {result}")
result = ''
except asyncio.TimeoutError:
pass
return result
async def if_flag_send_message():
print("if a flag is set, I will call websocket.send here")
async def client_writer(websocket):
await if_flag_send_message()
await asyncio.sleep(1.0)
async def client_handler():
websocket = await websockets.connect(f"ws://localhost:{PORT}")
result = ''
while True:
reader_task = asyncio.create_task(client_reader(websocket, result))
writer_task = asyncio.create_task(client_writer(websocket))
await asyncio.gather(reader_task, writer_task)
result = reader_task.result()
我仍然不确定这是做事的正确方法,但它确实有效。
推荐阅读
- git - 与 Visual Studio 同步错误;检查此分支并在再次推送之前集成远程更改
- python - OSError: [Errno 22] 尝试创建 txt 文件时参数无效
- python - Pandas:根据较长字符串列表中的匹配项展开列字符串值
- sql - 使用 select 语句仅从 nvarchar 列输出某些值
- node.js - 关闭浏览器选项卡或窗口时,Socket.io 连接未正确断开
- asp.net-mvc - PagedListRenderOptions :如何添加 FunctionToTransformEachPageLink 页面链接类?
- javascript - tooltipster - 在多个工具提示上使用一个功能 - origin.helper 或一个或多个工具提示的问题已经存在
- dax - 如何使用 dax 基于左连接在表中添加列
- content-management-system - Publii previousPost 和 nextPost 不起作用
- python - 初始化 tkinter gui 导致主线程不在主循环中