首页 > 解决方案 > GRPC 流关闭连接

问题描述

我正在编写一个golang使用grpc. 收到请求后,我应该将此流放入 a Chan,然后 a goroutine处理此请求并发回。但是rpc error: code = Unavailable desc = transport is closing当我在goroutine. 所以我想知道我是否可以传递streamChannel,这个操作是否关闭了连接?

这是在协议缓冲区中识别

service AsrService {
     rpc Recognize(stream RecognizeRequest) returns (stream RecognizeResponse) {}
}

这是使用 grpc 的自动生成

type AsrService_RecognizeServer interface {
    Send(*RecognizeResponse) error
    Recv() (*RecognizeRequest, error)
    grpc.ServerStream
}

这是给陈的流

func (s *ScheduleServer) Recognize(stream 
AsrService_RecognizeServer) error {
    req, err := stream.Recv() // I can use Recv here
    if err == io.EOF || err != nil {
        // do something
    }
    var asrRequest ASRRequest
    asrRequest.stream = &stream //pass stream to Chan
    ASRRequestChan <- &asrRequest

    return nil
}

这是一个处理 Chan 的 goroutine

type ASRRequest struct {
    stream AsrService_RecognizeServer
}

var ClientRequestChan = make(chan *ClientRequest, 200)
func HandlRequestChan() {
    for {
        select {
            case r := <- ClientRequestChan:
                Log.Infof("Chan get request info[%v]", r)
                var rsp RecognizeResponse
                rsp.Code = **
                streamInter := *r.stream
                err = streamInter.Send(&rsp) // I can use Send here
                if err != nil {
                    fmt.Printf("Grpc write failed,err[%v]", err)
                }
                fmt.Printf("return time[%v]\n",time.Now().UnixNano() / 1e6)
        }
    }    
}

然后我收到错误 rpc 错误:代码 = 不可用 desc = 传输正在关闭,那么在将流传递给陈之后是否关闭了?因为如果我不使用 Chan,它可以成功地将结果发送给客户端。

标签: gogrpcgrpc-go

解决方案


我更改策略并使用sync.WaitGroup以确保main goroutine在发回之前不返回stream。我将构建一个goroutine来处理这个问题stream,并且main goroutine直到child goroutine完成才返回。所以连接不会关闭。

var wg sync.WaitGroup
func (s *ScheduleServer) Recognize(stream pb.AsrService_RecognizeServer) error {
    wg.Add(1)
    go s.Recognize_Syn(&wg, stream)

    wg.Wait()
    return nil
}

func (s *ScheduleServer) Recognize_Syn(wg *sync.WaitGroup, stream pb.AsrService_RecognizeServer) error {
    defer wg.Done()
    //do something
    err = stream.Send(&rsp)
    return nil
}

推荐阅读