首页 > 解决方案 > 与 gRPC 客户端重新连接的正确方法

问题描述

我有一个 Go gRPC 客户端连接到在我的 k8s 集群中的不同 pod 中运行的 gRPC 服务器。

它运行良好,接收和处理请求。

我现在想知道在 gRPC 服务器 pod 被回收的情况下如何最好地实现弹性。

据我所知,clientconn.go 代码应该自动处理重新连接,但我无法让它工作,而且我担心我的实现一开始就不正确。

从 main 调用代码:

go func() {     
        if err := gRPCClient.ProcessRequests(); err != nil {
            log.Error("Error while processing Requests")
            //do something here??
        }
    }()

我在 gRPCClient 包装模块中的代码:

func (grpcclient *gRPCClient) ProcessRequests() error {
    defer grpcclient.Close()    

    for {
        request, err := reqclient.stream.Recv()
        log.Info("Request received")
        if err == io.EOF {          
            break
        }
        if err != nil {
            //when pod is recycled, this is what's hit with err:
            //rpc error: code = Unavailable desc = transport is closing"

            //what is the correct pattern for recovery here so that we can await connection
            //and continue processing requests once more?
            //should I return err here and somehow restart the ProcessRequests() go routine in the 
            //main funcition?
            break
            
        } else {
            //the happy path
            //code block to process any requests that are received
        }
    }

    return nil
}

func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
        reqclient.conn.Close()
}

编辑:Emin Laletovic 在下面优雅地回答了我的问题,并在很大程度上得到了它。我必须对 waitUntilReady 函数进行一些更改:

func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()

currentState := grpcclient.conn.GetState()
stillConnecting := true

for currentState != connectivity.Ready && stillConnecting {
    //will return true when state has changed from thisState, false if timeout
    stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
    currentState = grpcclient.conn.GetState()
    log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}

if stillConnecting == false {
    log.Error("Connection attempt has timed out.")
    return false
}

return true
}

标签: gokubernetesnetwork-programminggrpc

解决方案


RPC 连接由 自动处理clientconn.go,但这并不意味着流也会被自动处理。

流一旦断开,无论是由于 RPC 连接断开还是其他原因,都无法自动重新连接,并且您需要在 RPC 连接备份后从服务器获取新流。

等待 RPC 连接进入READY状态并建立新流的伪代码可能如下所示:

func (grpcclient *gRPCClient) ProcessRequests() error {
    defer grpcclient.Close()    
    
    go grpcclient.process()
    for {
      select {
        case <- grpcclient.reconnect:
           if !grpcclient.waitUntilReady() {
             return errors.New("failed to establish a connection within the defined timeout")
           }
           go grpcclient.process()
        case <- grpcclient.done:
          return nil
      }
    }
}

func (grpcclient *gRPCClient) process() {
    reqclient := GetStream() //always get a new stream
    for {
        request, err := reqclient.stream.Recv()
        log.Info("Request received")
        if err == io.EOF {          
            grpcclient.done <- true
            return
        }
        if err != nil {
            grpcclient.reconnect <- true
            return
            
        } else {
            //the happy path
            //code block to process any requests that are received
        }
    }
}

func (grpcclient *gRPCClient) waitUntilReady() bool {
  ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
  defer cancel()
  return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}

推荐阅读