首页 > 解决方案 > 如何为 google pub sub gRPC 调用配置代理凭据?

问题描述

我正在尝试从代理后面连接到谷歌云平台发布/订阅。

使用使用 google pub 子客户端的 Spring lib "org.springframework.cloud:spring-cloud-gcp-starter-pubsub",它为了对订阅进行拉调用,使用 gRPC 调用。

为了设置代理,我可以使用GRPC_PROXY_EXP环境变量,但我还需要凭据才能通过此代理。

我尝试了几种方法,包括配置org.springframework.cloud.gcp.pubsub.support.SubscriberFactory类似于此处的https://medium.com/google-cloud/accessing-google-cloud-apis-though-a-proxy-fe46658b5f2a

@Bean
    fun inboundQuotationsChannelAdapter(
        @Qualifier("inboundQuotationsMessageChannel") quotationsChannel: MessageChannel,
        mpProperties: ConfigurationProperties,
        defaultSubscriberFactory: SubscriberFactory
    ): PubSubInboundChannelAdapter {

        Authenticator.setDefault(ProxyAuthenticator("ala","bala"))

        val proxySubscriberFactory: DefaultSubscriberFactory = defaultSubscriberFactory as DefaultSubscriberFactory
        proxySubscriberFactory.setCredentialsProvider(ProxyCredentialsProvider(getCredentials()))
        val headers = mutableMapOf(Pair("Proxy-Authorization", getBasicAuth()))
        proxySubscriberFactory.setChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
            .setHeaderProvider(FixedHeaderProvider.create(headers)).build())


        val proxySubscriberTemplate = PubSubSubscriberTemplate(proxySubscriberFactory)

        val adapter = PubSubInboundChannelAdapter(proxySubscriberTemplate, mpProperties.gcp.quotationSubscription)
        adapter.outputChannel = quotationsChannel
        adapter.ackMode = AckMode.MANUAL
        adapter.payloadType = ActivityStateChanged::class.java
        return adapter
    }


    @Throws(IOException::class)
    fun getCredentials(): GoogleCredentials {
        val httpTransportFactory = getHttpTransportFactory(
            "127.0.0.1", 3128, "ala", "bala"
        )
        return GoogleCredentials.getApplicationDefault(httpTransportFactory)
    }

    fun getHttpTransportFactory(
        proxyHost: String?,
        proxyPort: Int,
        proxyUsername: String?,
        proxyPassword: String?
    ): HttpTransportFactory? {
        val proxyHostDetails = HttpHost(proxyHost, proxyPort)
        val httpRoutePlanner: HttpRoutePlanner = DefaultProxyRoutePlanner(proxyHostDetails)
        val credentialsProvider: CredentialsProvider = BasicCredentialsProvider()
        credentialsProvider.setCredentials(
            AuthScope(proxyHostDetails.hostName, proxyHostDetails.port),
            UsernamePasswordCredentials(proxyUsername, proxyPassword)
        )
        val httpClient: HttpClient = ApacheHttpTransport.newDefaultHttpClientBuilder()
            .setRoutePlanner(httpRoutePlanner)
            .setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE)
            .setDefaultCredentialsProvider(credentialsProvider)
            .setDefaultRequestConfig(
                RequestConfig.custom()
                    .setAuthenticationEnabled(true)
                    .setProxy(proxyHostDetails)
                    .build())
            .addInterceptorLast(HttpRequestInterceptor { request, context ->
                request.addHeader(
                    BasicHeader(
                        "Proxy-Authorization",
                        getBasicAuth()
                    )
                )
            })
            .build()
        val httpTransport: HttpTransport = ApacheHttpTransport(httpClient)
        return HttpTransportFactory { httpTransport }
    }

还尝试使用@GRpcGlobalInterceptorLogNet https://github.com/LogNet/grpc-spring-boot-starter

    @Bean
    @GRpcGlobalInterceptor
    fun globalServerInterceptor(): ServerInterceptor {
        return GrpcServerInterceptor(configurationProperties)
    }

    @Bean
    @GRpcGlobalInterceptor
    fun globalClientInterceptor(): ClientInterceptor {
        return GrpcClientInterceptor(configurationProperties)
    }

class GrpcClientInterceptor(private val configurationProperties: ConfigurationProperties) :
    ClientInterceptor {

    private val proxyUsername = configurationProperties.http.proxy.username
    private val proxyPassword = configurationProperties.http.proxy.password
    private val proxyHeaderKey = Metadata.Key.of("Proxy-Authorization", Metadata.ASCII_STRING_MARSHALLER)

    private fun getBasicAuth(): String {
        val usernameAndPassword = "$proxyUsername:$proxyPassword"
        val encoded = Base64.getEncoder().encodeToString(usernameAndPassword.toByteArray())
        return "Basic $encoded"
    }

    override fun <ReqT, RespT> interceptCall(
        method: MethodDescriptor<ReqT, RespT>?,
        callOptions: CallOptions?, next: Channel
    ): ClientCall<ReqT, RespT>? {
        return object : SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
            override fun start(responseListener: Listener<RespT>?, headers: Metadata) {
                headers.put(proxyHeaderKey, getBasicAuth())
                super.start(object : SimpleForwardingClientCallListener<RespT>(responseListener) {
                    override fun onHeaders(headers: Metadata) {
                        super.onHeaders(headers)
                    }
                }, headers)
            }
        }
    }
}
class GrpcServerInterceptor(private val configurationProperties: ConfigurationProperties) :
    ServerInterceptor {

    private val proxyUsername = configurationProperties.http.proxy.username
    private val proxyPassword = configurationProperties.http.proxy.password

    override fun <ReqT : Any?, RespT : Any?> interceptCall(
        call: ServerCall<ReqT, RespT>?,
        headers: io.grpc.Metadata?,
        next: ServerCallHandler<ReqT, RespT>?
    ): ServerCall.Listener<ReqT> {
        val proxyHeaderKey = Metadata.Key.of("Proxy-Authorization", Metadata.ASCII_STRING_MARSHALLER)
        if (!headers!!.containsKey(proxyHeaderKey))
            headers!!.put(proxyHeaderKey, getBasicAuth())
        return next!!.startCall(call, headers)
    }

    private fun getBasicAuth(): String {
        val usernameAndPassword = "$proxyUsername:$proxyPassword"
        val encoded = Base64.getEncoder().encodeToString(usernameAndPassword.toByteArray())
        return "Basic $encoded"
    }
}

(也直接在类级别上尝试了注释-ofc 它不起作用)

还尝试使用@GrpcGlobalServerInterceptor@GrpcGlobalClientInterceptor来自https://github.com/yidongnan/grpc-spring-boot-starter/tree/v2.12.0.RELEASE但这种依赖完全崩溃了应用程序

标签: springproxygrpcgoogle-cloud-pubsubgrpc-java

解决方案


在这里,您可以找到有关如何将 Java API 文档中的代理凭据设置为configure-a-proxy的示例;

public CloudTasksClient getService() throws IOException {
  TransportChannelProvider transportChannelProvider =
      CloudTasksStubSettings.defaultGrpcTransportProviderBuilder()
          .setChannelConfigurator(
              new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
                @Override
                public ManagedChannelBuilder apply(ManagedChannelBuilder managedChannelBuilder) {
                  return managedChannelBuilder.proxyDetector(
                      new ProxyDetector() {
                        @Nullable
                        @Override
                        public ProxiedSocketAddress proxyFor(SocketAddress socketAddress)
                            throws IOException {
                          return HttpConnectProxiedSocketAddress.newBuilder()
                              .setUsername(PROXY_USERNAME)
                              .setPassword(PROXY_PASSWORD)
                              .setProxyAddress(new InetSocketAddress(PROXY_HOST, PROXY_PORT))
                              .setTargetAddress((InetSocketAddress) socketAddress)
                              .build();
                        }
                      });
                }
              })
          .build();
  CloudTasksSettings cloudTasksSettings =
      CloudTasksSettings.newBuilder()
          .setTransportChannelProvider(transportChannelProvider)
          .build();
  return CloudTasksClient.create(cloudTasksSettings);
}

 

考虑一下它说 gRPC 代理当前处于试验阶段的注释。


推荐阅读