首页 > 解决方案 > 使用 asyncio 在 Python 中流式传输字典列表

问题描述

我正在开发 Python 服务器/客户端应用程序,其中服务器从客户端接收一些数据,并根据这些数据从嵌入式 k/v 存储中收集字典列表并将其流回。我在这里放了一个重现错误的代码。我将所有内容放在服务器端的单独函数中是有原因的(客户端发送不同的请求)。

问题是服务器发送的速度超过了客户端可以消耗的速度,并且客户端一次读取多个响应,有时它只是被截断的消息的一部分。我认为 writelines/readline 对将适当地从套接字读取,但我想我错过了一些东西。write/drain 也会使套接字过载,并且一旦读取多个结果,客户端就会失败,因为将分块的序列化字典读取到 orjson.loads。

解决这个问题的正确方法是什么?先感谢您!

服务器:

import orjson

async def getResult(cnt : int):
    await asyncio.sleep(0)
    result = []
    for i in range(cnt):
        result.append({"key" : i})
    return result

async def send(writer, list_of_dict):
    for r in list_of_dict:
        print(f"\nSending: {r}")
        writer.writelines([orjson.dumps(r)])
        await writer.drain()
    # sending END signal
    writer.writelines([orjson.dumps("END")])
    await writer.drain()

async def handleClient(reader, writer):
    addr = writer.get_extra_info('peername')
    print(f"Connection from {addr}")
    data = await reader.readline()
    message = orjson.loads(data)
    print(f"Received {message} from {addr}")
    counter = message["send_me"]
    responses = await getResult(counter)
    await send(writer, responses)
    print("Close the client socket")
    writer.close()


loop = asyncio.get_event_loop()
coro = asyncio.start_server(handleClient, '127.0.0.1', 4000, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
 
# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

客户

import asyncio
import orjson
 
async def async_client(loop):
    reader, writer = await asyncio.open_connection('127.0.0.1', 4000, loop=loop)
    counter = 5
    print(f"Request counter: {counter}")
    # in real life the message is a complex dictionary
    msg = {"send_me" : counter}
    writer.writelines([orjson.dumps(msg)])
    #without write_eof the server reader.readline() waits for data and blocks
    if writer.can_write_eof():
        writer.write_eof()

    while True:
        data = await reader.readline()
        if data:
            print(data)
            r = orjson.loads(data)
            print(f"Received: {r}")
            if r == "END":
                print("server completed")
                break
        else:
            await asyncio.sleep(0.1)

    print('Close the socket')
    writer.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(async_client(loop))
loop.close()

错误:

>python echo_client.py
Request counter: 5

b'{"key":0}{"key":1}{"key":2}{"key":3}{"key":4}"END"'

Traceback (most recent call last):
  File "echo_client.py", line 32, in <module>
    loop.run_until_complete(async_client(loop))
  File "C:\Program Files (x86)\Anaconda\lib\asyncio\base_events.py", line 587, in run_until_complete
    return future.result()
  File "echo_client.py", line 21, in async_client
    r = orjson.loads(data)

orjson.JSONDecodeError: trailing characters at line 1 column 10: line 1 column 1 (char 0)

标签: pythonsocketspython-asyncio

解决方案


我认为问题要简单得多:writelines没有做你认为的事情。它不插入换行符,它只是写入你给它的任何数据。这就是为什么readline()您的客户端会获取有效负载并"END"连接在一起的原因。这也是为什么你需要write_eof在另一个方向。

如果你想写一行,那么只需在你的有效载荷之后写一个换行符(字节)。您可以在为您处理它的函数中抽象它:

async def write_msg(writer, msg):
    writer.write(orjson.dumps(msg))
    writer.write('\n')
    await writer.drain()

async def read_msg(reader):
    line = await reader.readline()
    return orjson.loads(line)

您可以在客户端和服务器上使用它们进行通信。

顺便说一句,您可能应该切换到较新的asyncio.run()API,它使用单个异步入口点创建并正确拆除事件循环。您的服务器设置如下所示:

async def main():
    await asyncio.start_server(handleClient, '127.0.0.1', 4000)
    await server.wait_closed()

asyncio.run(main())

推荐阅读