首页 > 解决方案 > 如何在 python 中配置 gRPC HTTP/2 流控制

问题描述

我有一个带有以下原型的 gRPC 服务器:

syntax = "proto3";

service MyServicer {
  rpc DoSomething(stream InputBigData) returns (stream OutputBigData) {}
}
message InputBigData {
    bytes data = 1;
}
message OutputBigData {
    bytes data = 1;
}

我的服务器是使用以下 Python 代码创建的:

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                     options=[('grpc.max_receive_message_length', -1),
                              ('grpc.max_send_message_length', -1))])

max_receive_message_length 和 max_send_message_length 设置为 -1 以允许传输大消息(通常为 8Mb)。客户端也定义了相同的选项。

案例 1:考虑客户端向服务器发送 InputBigData 的速率高于服务器所能承受的速率。如何配置输入流中可以排队多少 InputBigData(或字节)?

案例 2:考虑客户端从服务器读取响应 OutputBigData 的速率低于客户端所能承受的速率。如何配置输出流中可以排队多少个 OutputBigData(或字节)?

我知道 gRPC 流控制基于 HTTP/2:https : //httpwg.org/specs/rfc7540.html#FlowControl 我尝试将 grpc.http2.write_buffer_size 设置为 67108864(似乎是最大值)但什么也没发生。

这是一个突出案例 2 的实现:

# server.py
from concurrent import futures

import grpc
import myservicer_pb2_grpc, myservicer_pb2


class MyServicer(myservicer_pb2_grpc.MyServicer):

    def DoSomething(self, request_iterator, target, **kwargs):
        big_data = b'0' * 1920*1080*4
        for r in request_iterator:
            print("server received input big data")
            yield myservicer_pb2.OutputBigData(data=big_data)
            print("server sent output big data")


if __name__ == '__main__':
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10),
                         options=[('grpc.max_receive_message_length', -1),
                                  ('grpc.max_send_message_length', -1)])
    myservicer_pb2_grpc.add_MyServicerServicer_to_server(
        MyServicer(), server)
    server.add_insecure_port("[::]:50051")
    server.start()
    server.wait_for_termination()

# client.py
import time
import grpc

import myservicer_pb2_grpc
import myservicer_pb2


def big_data_generator():
    big_data = b'0' * 1920*1080*4
    for i in range(100):
        yield myservicer_pb2.InputBigData(data=big_data)


def run():
    with grpc.insecure_channel('localhost:50051',
                               options=[('grpc.max_send_message_length', -1),
                                        ('grpc.max_receive_message_length', -1)]) as channel:
        stub = myservicer_pb2_grpc.MyServicerStub(channel)
        res = stub.DoSomething(big_data_generator())

        for r in res:
            print("Client received data")
            time.sleep(10)

if __name__ == '__main__':
    run()

10 秒后,我的服务器输出为:

server received input big data
server sent output big data
server received input big data
server sent output big data
server received input big data

我的客户输出是:

Client received data

我的服务器收到了 3 个 InputBigData 并发送了 2 个 OutputBigData。它现在被阻塞,直到客户端使用输出数据。在这种情况下,我想增加(2 或 3 倍)输出缓冲区大小,以便即使客户端延迟使用结果,它也可以继续处理更多输入数据。

标签: pythongrpchttp2grpc-python

解决方案


感谢您的详细问题。我尝试了您的示例,但仍然无法调整 gRPC 以自由增加其窗口大小。

gRPC 通道参数可以在这里找到。流控制实现在这里只有几个可能会影响流控制,它们是:

  • grpc.http2.bdp_probe=0:禁用自动窗口增加
  • grpc.http2.max_frame_size: HTTP/2 最大帧大小
  • grpc.http2.write_buffer_size:不是真正的流控制选项,它用于 GRPC_WRITE_BUFFER_HINT(无阻塞写入)。此外,gRPC Python 尚不支持 GRPC_WRITE_BUFFER_HINT

没有可以触发窗口大小更新的参数。默认窗口大小为 64KB。gRPC 将通过 BDP 估计增加窗口大小。例如,在我的笔记本电脑上,客户端出站窗口大小增加到 8380679 (~8MB)。但我还没有找到手动干预这个过程的方法。

因此,不幸的是,您可能需要应用程序级缓冲。您可以在异步中使用协程或在客户端和服务器端使用线程安全队列进行线程化。


推荐阅读