首页 > 解决方案 > 使用多个 ResponseObserver.onNext 会卡住服务 - gRPC Java

问题描述

我想用 gRPC 以 50 个块发送数据,但是当responseObserver.onNext多次运行时,它只是卡住并且不会发送数据。但是当我停止流时,我得到了数据。

这是我的代码:

// List<List<MyClass>> listOfMyClass
for (List<MyClass> myClasses : listOfMyClass) {
  responseObserver.onNext(buildReply(myClasses));
}
responseObserver.onCompleted();

这会让它卡住。但是,如果我只运行responseObserver.onNext(buildReply(myClasses));一次,我会立即获得数据。

我的原型:

message Request {
    string number = 1;
}

message Reply {
    repeated CustomMessage results = 1;
}

service Service {
    rpc MyRequest (Request) returns (stream Reply) {}
}

我一直在使用一个名为https://github.com/uw-labs/bloomrpc的 GUI ,它应该可以轻松显示流。

标签: javagrpcgrpc-java

解决方案


StreamObserver接口是非阻塞的,不会为出站数据提供背压。为了给客户端提供背压,您需要将StreamObserver客户端用于将数据发送到 ServerCallStreamObserver 的参数进行转换,并观察“就绪”位。像这样的东西:

ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver =
      (ClientCallStreamObserver<File>) responseObserver;
clientCallStreamObserver.setOnReadyHandler(new Runnable() {
    public void run() {
      sendUntilNotReady(clientCallStreamResponseObserver);
    }
});
sendUntilNotReady(clientCallStreamResponseObserver);

...

private void sendUntilNotReady(ClientCallStreamObserver<MyClass> clientCallStreamResponseObserver) {
  while (clientCallStreamResponseObserver.isReady()) {
    clientCallStreamResponseObserver.onNext(... /* build reply for next item in listOfMyClass */);
  }
}

推荐阅读