首页 > 解决方案 > 将服务重定向到双向 grpc

问题描述

我目前正在开发一个 grpc 服务器,它将接收来自第一台服务器的流式 grpc 调用并将这些调用重定向到第二台服务器,并将来自第二台服务器的响应作为流重定向到第一个服务器。

我有 2 个 proto 文件第一个 proto

第一个文件:

syntax = "proto3";

package first.proto.pack;

service FirstProtoService {
  rpc StreamingCall(stream RequestToFirstServer) returns (stream ResponseForFirstServer){}
}

message RequestToFirstServer {
    oneof firstStreamingRequest {
        int32 x = 1;
        int32 y = 2;
    }
}

message ResponseForFirstServer {
  string someprocessedinformation = 1;
}

第二个文件:

syntax = "proto3";

package second.proto.pack;

service SecondProtoService {
  rpc StreamingCall(stream RequestToSecondServer) returns (stream ResponseFromSecondServer){}
}

message RequestToSecondServer {
  oneof secondStreamingRequest {
    int32 processedX = 1;
    int32 procesdedY = 2;
  }
}

message ResponseFromSecondServer {
  string computedInformation = 1;
}

第一个服务器知道第一个原始文件,但不知道第二个。

第二个服务器知道第二个原始文件,但不知道第一个。

中间服务器知道第一个和第二个原型。

需要编写一个服务器,将请求从一台服务器从一台服务器传输到另一台服务器

我开始在 Java 上编写它。但是面临向第二台服务器发送大量请求的问题

我的服务中间实现在 Java 上的样子:

package middle.server.pack;


import first.proto.pack.First;
import first.proto.pack.FirstProtoServiceGrpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import second.proto.pack.Second;
import second.proto.pack.SecondProtoServiceGrpc;

import java.util.logging.LogManager;
import java.util.logging.Logger;

public class MiddleService extends FirstProtoServiceGrpc.FirstProtoServiceImplBase {
    private final ManagedChannel channel = ManagedChannelBuilder.forTarget("localhost:8080").build();
    private final Logger logger = LogManager.getLogManager().getLogger(MiddleService.class.getName());

    @Override
    public StreamObserver<First.RequestToFirstServer> streamingCall(StreamObserver<First.ResponseForFirstServer> responseObserver) {
        return new StreamObserver<First.RequestToFirstServer>() {
            @Override
            public void onNext(First.RequestToFirstServer value) {
                SecondProtoServiceGrpc.SecondProtoServiceStub stub = SecondProtoServiceGrpc.newStub(channel);
                StreamObserver<Second.RequestToSecondServer> requestObserver = stub.streamingCall(
                        new StreamObserver<Second.ResponseFromSecondServer>() {
                            @Override
                            public void onNext(Second.ResponseFromSecondServer value) {
                                doProcessOnResponse(value);
                                First.ResponseForFirstServer responseForFirstServer =
                                        mapToFirstResponse(value);
                                responseObserver.onNext(responseForFirstServer);
                            }

                            @Override
                            public void onError(Throwable t) {
                                logger.info(t.getMessage());
                            }

                            @Override
                            public void onCompleted() {
                                logger.info("sucess");
                            }
                        }
                );
                Second.RequestToSecondServer requestToSecondServer = mapToSecondRequest(value);
                requestObserver.onNext(requestToSecondServer);
                requestObserver.onCompleted();
            }

            @Override
            public void onError(Throwable t) {
                logger.info(t.getMessage());
            }

            @Override
            public void onCompleted() {
                logger.info("Everything okay");
            }
        };
    }
}

在中间服务器端的第一个客户端发出请求后,我收到以下错误:

CANCELLED: Failed to read message.
CANCELLED: io.grpc.Context was cancelled without error

我知道我做错了。所以问题是如何使它正确,或者如果我不能在 java 上制作它,我可以用任何其他语言制作它吗?

标签: javajava-streamgrpcgrpc-javajava-server

解决方案


据我所知,问题在于onNext()您每次都在启动 new 时streamingCall收到来自. 即使它是来自第一台服务器的同一个流,第一条消息创建自己的流到第二台服务器,第二条消息创建自己的流到第二台服务器,等等。这解释了为什么你会遇到“发送大量请求到第二台服务器”。SecondProtoServiceGrpcMiddleServiceFirstProtoServiceGrpc

相反,中间层应该反映第一台服务器所做的事情(对于这个例子,我们只看方向第一 -> 中间 -> 第二)。当第一个服务器向中间创建一个新流 (1) 时,中间向第二个服务器创建一个新流 (2)。当中间服务器在流 (1) 上(从第一台服务器)获得消息时,它会将其发送到流 (2)。当流 (1) 关闭时,流 (2) 也关闭。等等。

相反的方向也一样,反之亦然。


推荐阅读