首页 > 解决方案 > 用于双向流的 GRPC 刷新令牌

问题描述

TLDR:我正在寻找一种方法来更新每次调用的打开流上的标头,stream.Send(msg)而无需关闭流并打开新流。

概括

我有一个 GRPC 客户端和服务器,用于处理双向流。要向服务器进行身份验证,客户端必须在请求标头中发送 JWT,设置为“授权”。令牌有效期为 30 分钟。令牌过期后,服务器将终止连接。

我正在寻找一种从客户端刷新我的授权令牌并保持流打开的方法。客户端应该在循环中运行,每 30 分钟使用更新的令牌和更新的有效负载执行一个新请求。我还没有看到从客户端更新已打开流的标头的方法。

让我们看一些代码来了解客户端的样子。下面的代码有一个创建新客户端实例的函数,以及另一个建立与 GRPC 服务器连接的函数。

func NewWatchClient(config *Config, logger *logrus.Logger) (*WatchClient, error) {
    cc, err := newConnection(config, logger)
    if err != nil {
        return nil, err
    }

    service := proto.NewWatchServiceClient(cc)

    return &WatchClient{
        config:  config,
        conn:    cc,
        logger:  entry,
        service: service,
    }, nil
}

func newConnection(config *Config, logger *logrus.Logger) (*grpc.ClientConn, error) {
    address := fmt.Sprintf("%s:%d", config.Host, config.Port)

    // rpcCredential implements credentials.PerRPCCredentials
    rpcCredential := newTokenAuth(config.Auth, config.TenantID)

    return grpc.Dial(
        address,
        grpc.WithPerRPCCredentials(rpcCredential),
    )
}

查看newConnection上面的函数,我们可以看到调用另一个函数 ,newTokenAuth来创建身份验证令牌,效果很好。此函数返回一个实现PerRPCCredentials接口的结构。

有两种方法可以设置请求的授权。

  1. 使用grpc.WithPerRPCCredentials在创建与服务器的连接时添加授权。

  2. 使用grpc.PerRPCCredentials将授权添加到在与服务器的连接上打开的每个流。

在这种情况下,我使用grpc.WithPerRPCCredentials在创建与服务器的连接时附加令牌。

现在,让我们看一下PerRPCCredentials的定义。

type PerRPCCredentials interface {
    // GetRequestMetadata gets the current request metadata, refreshing
    // tokens if required. This should be called by the transport layer on
    // each request, and the data should be populated in headers or other
    // context. If a status code is returned, it will be used as the status
    // for the RPC. uri is the URI of the entry point for the request.
    // When supported by the underlying implementation, ctx can be used for
    // timeout and cancellation. Additionally, RequestInfo data will be
    // available via ctx to this call.
    // TODO(zhaoq): Define the set of the qualified keys instead of leaving
    // it as an arbitrary string.
    GetRequestMetadata(ctx context.Context, uri ...string) (map[string]string, error)
    // RequireTransportSecurity indicates whether the credentials requires
    // transport security.
    RequireTransportSecurity() bool
}

该接口要求您定义两个方法。的文档GetRequestMetadata

GetRequestMetadata 获取当前请求元数据,如果需要刷新令牌

因此,看起来我的实现PerRPCCredentials应该能够为我的流或连接处理令牌刷新。让我们看看我的实现PerRPCCredentials

// tokenAuth implements the PerRPCCredentials interface
type tokenAuth struct {
    tenantID       string
    tokenRequester auth.PlatformTokenGetter
    token          string
}

// RequireTransportSecurity leave as false for now
func (tokenAuth) RequireTransportSecurity() bool {
    return false
}

// GetRequestMetadata sets the http header prior to transport
func (t tokenAuth) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
    token, err := t.tokenRequester.GetToken()
    if err != nil {
        return nil, err
    }
    t.token = token

    go func() {
        time.Sleep(25 * time.Minute)
        token, _ := t.tokenRequester.GetToken()
        t.token = token
    }()

    return map[string]string{
        "tenant-id": t.tenantID,
        "authorization":     "Bearer " + t.token,
    }, nil
}

如您所见,调用GetRequestMetadata将建立一个 go 例程,该例程将尝试每 25 分钟刷新一次令牌。在这里添加一个 goroutine 可能不是正确的方法。这是试图让 auth 标头刷新的尝试,但这是行不通的。

让我们看一下流。

func (w WatchClient) CreateWatch() error {
    topic := &proto.Request{SelfLink: w.config.TopicSelfLink}

    stream, err := w.service.CreateWatch(context.Background())
    if err != nil {
        return err
    }

    for {
        err = stream.Send(topic)
        if err != nil {
            return err
        }
        time.Sleep(25 * time.Minute)
    }
}

客户端每 25 分钟在流上发送一条消息。我希望在这里得到的是,当stream.Send被调用时,更新的令牌也会被发送。

此函数GetRequestMetadata仅被调用一次,无论我是否通过设置身份验证grpc.WithPerRPCCredentialsgrpc.PerRPCCredsCallOption似乎都无法更新授权标头。

如果您知道我在尝试使用PerRPCCredentialsfor 令牌刷新时遗漏了什么,或者如果您知道可以通过拦截器或其他方式完成它的另一种方式,请告诉我。

谢谢你。

标签: gogrpcgrpc-go

解决方案


标头在 RPC 开始时发送,并且在 RPC 期间无法更新。如果您需要在流的生命周期内发送数据,则它需要成为原型定义中请求消息的一部分。


推荐阅读