python - 如何在 python 中配置 gRPC HTTP/2 流控制
问题描述
我有一个带有以下原型的 gRPC 服务器:
syntax = "proto3";
service MyServicer {
rpc DoSomething(stream InputBigData) returns (stream OutputBigData) {}
}
message InputBigData {
bytes data = 1;
}
message OutputBigData {
bytes data = 1;
}
我的服务器是使用以下 Python 代码创建的:
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1))])
max_receive_message_length 和 max_send_message_length 设置为 -1 以允许传输大消息(通常为 8Mb)。客户端也定义了相同的选项。
案例 1:考虑客户端向服务器发送 InputBigData 的速率高于服务器所能承受的速率。如何配置输入流中可以排队多少 InputBigData(或字节)?
案例 2:考虑客户端从服务器读取响应 OutputBigData 的速率低于客户端所能承受的速率。如何配置输出流中可以排队多少个 OutputBigData(或字节)?
我知道 gRPC 流控制基于 HTTP/2:https : //httpwg.org/specs/rfc7540.html#FlowControl 我尝试将 grpc.http2.write_buffer_size 设置为 67108864(似乎是最大值)但什么也没发生。
这是一个突出案例 2 的实现:
# server.py
from concurrent import futures
import grpc
import myservicer_pb2_grpc, myservicer_pb2
class MyServicer(myservicer_pb2_grpc.MyServicer):
def DoSomething(self, request_iterator, target, **kwargs):
big_data = b'0' * 1920*1080*4
for r in request_iterator:
print("server received input big data")
yield myservicer_pb2.OutputBigData(data=big_data)
print("server sent output big data")
if __name__ == '__main__':
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
options=[('grpc.max_receive_message_length', -1),
('grpc.max_send_message_length', -1)])
myservicer_pb2_grpc.add_MyServicerServicer_to_server(
MyServicer(), server)
server.add_insecure_port("[::]:50051")
server.start()
server.wait_for_termination()
# client.py
import time
import grpc
import myservicer_pb2_grpc
import myservicer_pb2
def big_data_generator():
big_data = b'0' * 1920*1080*4
for i in range(100):
yield myservicer_pb2.InputBigData(data=big_data)
def run():
with grpc.insecure_channel('localhost:50051',
options=[('grpc.max_send_message_length', -1),
('grpc.max_receive_message_length', -1)]) as channel:
stub = myservicer_pb2_grpc.MyServicerStub(channel)
res = stub.DoSomething(big_data_generator())
for r in res:
print("Client received data")
time.sleep(10)
if __name__ == '__main__':
run()
10 秒后,我的服务器输出为:
server received input big data
server sent output big data
server received input big data
server sent output big data
server received input big data
我的客户输出是:
Client received data
我的服务器收到了 3 个 InputBigData 并发送了 2 个 OutputBigData。它现在被阻塞,直到客户端使用输出数据。在这种情况下,我想增加(2 或 3 倍)输出缓冲区大小,以便即使客户端延迟使用结果,它也可以继续处理更多输入数据。
解决方案
感谢您的详细问题。我尝试了您的示例,但仍然无法调整 gRPC 以自由增加其窗口大小。
gRPC 通道参数可以在这里找到。流控制实现在这里只有几个可能会影响流控制,它们是:
grpc.http2.bdp_probe=0
:禁用自动窗口增加grpc.http2.max_frame_size
: HTTP/2 最大帧大小grpc.http2.write_buffer_size
:不是真正的流控制选项,它用于 GRPC_WRITE_BUFFER_HINT(无阻塞写入)。此外,gRPC Python 尚不支持 GRPC_WRITE_BUFFER_HINT
没有可以触发窗口大小更新的参数。默认窗口大小为 64KB。gRPC 将通过 BDP 估计增加窗口大小。例如,在我的笔记本电脑上,客户端出站窗口大小增加到 8380679 (~8MB)。但我还没有找到手动干预这个过程的方法。
因此,不幸的是,您可能需要应用程序级缓冲。您可以在异步中使用协程或在客户端和服务器端使用线程安全队列进行线程化。
推荐阅读
- python-3.x - 使用cherrypy构建的NGINX反向代理微服务
- orm - SailsJS,Waterline 使用选择填充记录
- gradle - 无法在 Gradle 插件块中参数化版本
- lit-element - 应该在模板中使用 ("...") 吗?
- xamarin - 如何使我的界面适应所有类型的设备(iOS/Android)?
- javascript - 如何将 PHP 和 HTML 代码 iike 文本放入 javascript var。(内部HTML)?
- python - 遍历作为数组的字典值
- regex - 使用正则表达式将字符串替换为随机数/字符串
- perl - 使用 perl 在所有文件中搜索和更新字符串
- .net - 通过 Microsoft Graph API 创建新的 AD B2C 用户