go - 与 gRPC 客户端重新连接的正确方法
问题描述
我有一个 Go gRPC 客户端连接到在我的 k8s 集群中的不同 pod 中运行的 gRPC 服务器。
它运行良好,接收和处理请求。
我现在想知道在 gRPC 服务器 pod 被回收的情况下如何最好地实现弹性。
据我所知,clientconn.go 代码应该自动处理重新连接,但我无法让它工作,而且我担心我的实现一开始就不正确。
从 main 调用代码:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
我在 gRPCClient 包装模块中的代码:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit with err:
//rpc error: code = Unavailable desc = transport is closing"
//what is the correct pattern for recovery here so that we can await connection
//and continue processing requests once more?
//should I return err here and somehow restart the ProcessRequests() go routine in the
//main funcition?
break
} else {
//the happy path
//code block to process any requests that are received
}
}
return nil
}
func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
reqclient.conn.Close()
}
编辑:Emin Laletovic 在下面优雅地回答了我的问题,并在很大程度上得到了它。我必须对 waitUntilReady 函数进行一些更改:
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
currentState := grpcclient.conn.GetState()
stillConnecting := true
for currentState != connectivity.Ready && stillConnecting {
//will return true when state has changed from thisState, false if timeout
stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
currentState = grpcclient.conn.GetState()
log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}
if stillConnecting == false {
log.Error("Connection attempt has timed out.")
return false
}
return true
}
解决方案
RPC 连接由 自动处理clientconn.go
,但这并不意味着流也会被自动处理。
流一旦断开,无论是由于 RPC 连接断开还是其他原因,都无法自动重新连接,并且您需要在 RPC 连接备份后从服务器获取新流。
等待 RPC 连接进入READY
状态并建立新流的伪代码可能如下所示:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.waitUntilReady() {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) process() {
reqclient := GetStream() //always get a new stream
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
grpcclient.done <- true
return
}
if err != nil {
grpcclient.reconnect <- true
return
} else {
//the happy path
//code block to process any requests that are received
}
}
}
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}
推荐阅读
- kubernetes - 获取 Kubernetes 部署的 pod 的推荐方法是什么?
- android - 上传免安装应用时出现错误 736569013
- javascript - Chrome 扩展通过 POST 将选定的文本发送到外部 API
- html - 无法在我的网页中设置字体系列
- html - HTML tbody only 只有一个标题列宽
- php - 您可以扩展别名吗?如果可以的话,如何扩展?
- ffmpeg - 应该如何开始使用 ffmpeg 的 API?
- java - 在网格布局中动态添加视图不采用指定的大小
- php - 无法在 codeigniter 中显示自己的错误消息
- r - R - 引用替换