java - 将服务重定向到双向 grpc
问题描述
我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一个服务器。
我有 2 个 proto 文件第一个 proto
第一个文件:
syntax = "proto3";
package first.proto.pack;
service FirstProtoService {
rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}
message RequestToFirstServer {
oneof firstStreamingRequest {
int32 x = 1;
int32 y = 2;
}
}
message ResponseForFirstServer {
string someprocessedinformation = 1;
}
第二个文件:
syntax = "proto3";
package second.proto.pack;
service SecondProtoService {
rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}
message RequestToSecondServer {
oneof secondStreamingRequest {
int32 processedX = 1;
int32 procesdedY = 2;
}
}
message ResponseFromSecondServer {
string computedInformation = 1;
}
第一个服务器知道第一个原始文件,但不知道第二个。
第二个服务器知道第二个原始文件,但不知道第一个。
中间服务器知道第一个和第二个原型。
需要编写一个服务器,将请求从一台服务器从一台服务器传输到另一台服务器
我开始在 Java 上编写它。但是面临向第二台服务器发送大量请求的问题
我的服务中间实现在 Java 上的样子:
package middle.server.pack;
import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;
import java.util.logging.LogManager;
import java.util.logging.Logger;
public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());
@Override
public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
return new StreamObserver<First.RequestToFirstServer>() {
@Override
public void onNext(First.RequestToFirstServer value) {
SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
new StreamObserver<Second.ResponseFromSecondServer>() {
@Override
public void onNext(Second.ResponseFromSecondServer value) {
doProcessOnResponse(value);
First.ResponseForFirstServer responseForFirstServer =
mapToFirstResponse(value);
responseObserver.onNext(responseForFirstServer);
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("sucess");
}
}
);
Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
requestObserver.onNext(requestToSecondServer);
requestObserver.onCompleted();
}
@Override
public void onError(Throwable t) {
logger.info(t.getMessage());
}
@Override
public void onCompleted() {
logger.info("Everything okay");
}
};
}
}
在中间服务器端的第一个客户端发出请求后,我收到以下错误:
CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error
我知道我做错了。所以问题是如何使它正确,或者如果我不能在 java 上制作它,我可以用任何其他语言制作它吗?
解决方案
据我所知,问题在于onNext()
您每次都在启动 new 时streamingCall
收到来自. 即使它是来自第一台服务器的同一个流,第一条消息创建自己的流到第二台服务器,第二条消息创建自己的流到第二台服务器,等等。这解释了为什么你会遇到“发送大量请求到第二台服务器”。SecondProtoServiceGrpc
MiddleService
FirstProtoServiceGrpc
相反,中间层应该反映第一台服务器所做的事情(对于这个例子,我们只看方向第一 -> 中间 -> 第二)。当第一个服务器向中间创建一个新流 (1) 时,中间向第二个服务器创建一个新流 (2)。当中间服务器在流 (1) 上(从第一台服务器)获得消息时,它会将其发送到流 (2)。当流 (1) 关闭时,流 (2) 也关闭。等等。
相反的方向也一样,反之亦然。
推荐阅读
- pycharm - 在pycharm中跳转到当前文件/选项卡中的定义的快捷方式
- lisp - common-lisp 中更好的 pythonic `join`
- mysql - 如何使用 Express 将 Node.js 与 MySQL 8.0 连接起来?
- node.js - 如何将数组中的数据一一发送到socket.io
- ruby-on-rails - Rails 5.2 不加载环境变量
- c# - 在 Regex C# 中允许撇号、点、& 符号、数字和字符
- c# - ILogger:如果信息没有真正记录,如何避免做繁重的工作
- angular - 嵌套 *ngFor 中的 Angular 4 ngModel 绑定
- sip - Kamailio,使用扩展向用户发送请求(类似于 Asterisk 扩展)
- javascript - 加载 iframe DOM 时的事件处理程序