python - 使用 asyncio 在 Python 中流式传输字典列表
问题描述
我正在开发 Python 服务器/客户端应用程序,其中服务器从客户端接收一些数据,并根据这些数据从嵌入式 k/v 存储中收集字典列表并将其流回。我在这里放了一个重现错误的代码。我将所有内容放在服务器端的单独函数中是有原因的(客户端发送不同的请求)。
问题是服务器发送的速度超过了客户端可以消耗的速度,并且客户端一次读取多个响应,有时它只是被截断的消息的一部分。我认为 writelines/readline 对将适当地从套接字读取,但我想我错过了一些东西。write/drain 也会使套接字过载,并且一旦读取多个结果,客户端就会失败,因为将分块的序列化字典读取到 orjson.loads。
解决这个问题的正确方法是什么?先感谢您!
服务器:
import orjson
async def getResult(cnt : int):
await asyncio.sleep(0)
result = []
for i in range(cnt):
result.append({"key" : i})
return result
async def send(writer, list_of_dict):
for r in list_of_dict:
print(f"\nSending: {r}")
writer.writelines([orjson.dumps(r)])
await writer.drain()
# sending END signal
writer.writelines([orjson.dumps("END")])
await writer.drain()
async def handleClient(reader, writer):
addr = writer.get_extra_info('peername')
print(f"Connection from {addr}")
data = await reader.readline()
message = orjson.loads(data)
print(f"Received {message} from {addr}")
counter = message["send_me"]
responses = await getResult(counter)
await send(writer, responses)
print("Close the client socket")
writer.close()
loop = asyncio.get_event_loop()
coro = asyncio.start_server(handleClient, '127.0.0.1', 4000, loop=loop)
server = loop.run_until_complete(coro)
# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
客户
import asyncio
import orjson
async def async_client(loop):
reader, writer = await asyncio.open_connection('127.0.0.1', 4000, loop=loop)
counter = 5
print(f"Request counter: {counter}")
# in real life the message is a complex dictionary
msg = {"send_me" : counter}
writer.writelines([orjson.dumps(msg)])
#without write_eof the server reader.readline() waits for data and blocks
if writer.can_write_eof():
writer.write_eof()
while True:
data = await reader.readline()
if data:
print(data)
r = orjson.loads(data)
print(f"Received: {r}")
if r == "END":
print("server completed")
break
else:
await asyncio.sleep(0.1)
print('Close the socket')
writer.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(async_client(loop))
loop.close()
错误:
>python echo_client.py
Request counter: 5
b'{"key":0}{"key":1}{"key":2}{"key":3}{"key":4}"END"'
Traceback (most recent call last):
File "echo_client.py", line 32, in <module>
loop.run_until_complete(async_client(loop))
File "C:\Program Files (x86)\Anaconda\lib\asyncio\base_events.py", line 587, in run_until_complete
return future.result()
File "echo_client.py", line 21, in async_client
r = orjson.loads(data)
orjson.JSONDecodeError: trailing characters at line 1 column 10: line 1 column 1 (char 0)
解决方案
我认为问题要简单得多:writelines
没有做你认为的事情。它不插入换行符,它只是写入你给它的任何数据。这就是为什么readline()
您的客户端会获取有效负载并"END"
连接在一起的原因。这也是为什么你需要write_eof
在另一个方向。
如果你想写一行,那么只需在你的有效载荷之后写一个换行符(字节)。您可以在为您处理它的函数中抽象它:
async def write_msg(writer, msg):
writer.write(orjson.dumps(msg))
writer.write('\n')
await writer.drain()
async def read_msg(reader):
line = await reader.readline()
return orjson.loads(line)
您可以在客户端和服务器上使用它们进行通信。
顺便说一句,您可能应该切换到较新的asyncio.run()
API,它使用单个异步入口点创建并正确拆除事件循环。您的服务器设置如下所示:
async def main():
await asyncio.start_server(handleClient, '127.0.0.1', 4000)
await server.wait_closed()
asyncio.run(main())
推荐阅读
- spring-boot - 使用执行器初始化 SpringBoot 2.1.8.RELEASE 时出错
- powershell - 如何在Powershell中从没有标题的文本文件中选择按制表符拆分的前五列?
- c# - 将 LIST 从 c# 发送到 angular
- quire-api - 新 API - 将任务添加到板?
- swift - 为什么 becomeFirstResponder 不会在警报消息文本字段上触发软键盘
- android - 如何在 Android 10 中获取有关手机 IMEI 的信息?
- jquery - 如何使用 jQuery 无法拖动的某些项目制作可排序列表
- php - 在 Laravel 中创建嵌套关系
- sqlite - 如何在 Flutter 应用中将 SQLite 数据导出为 CSV 文件
- html - 使用nodejs时,如何访问根目录之外的文件?