spring - 如何为 google pub sub gRPC 调用配置代理凭据?
问题描述
我正在尝试从代理后面连接到谷歌云平台发布/订阅。
使用使用 google pub 子客户端的 Spring lib "org.springframework.cloud:spring-cloud-gcp-starter-pubsub"
,它为了对订阅进行拉调用,使用 gRPC 调用。
为了设置代理,我可以使用GRPC_PROXY_EXP
环境变量,但我还需要凭据才能通过此代理。
我尝试了几种方法,包括配置org.springframework.cloud.gcp.pubsub.support.SubscriberFactory
类似于此处的https://medium.com/google-cloud/accessing-google-cloud-apis-though-a-proxy-fe46658b5f2a
@Bean
fun inboundQuotationsChannelAdapter(
@Qualifier("inboundQuotationsMessageChannel") quotationsChannel: MessageChannel,
mpProperties: ConfigurationProperties,
defaultSubscriberFactory: SubscriberFactory
): PubSubInboundChannelAdapter {
Authenticator.setDefault(ProxyAuthenticator("ala","bala"))
val proxySubscriberFactory: DefaultSubscriberFactory = defaultSubscriberFactory as DefaultSubscriberFactory
proxySubscriberFactory.setCredentialsProvider(ProxyCredentialsProvider(getCredentials()))
val headers = mutableMapOf(Pair("Proxy-Authorization", getBasicAuth()))
proxySubscriberFactory.setChannelProvider(SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setHeaderProvider(FixedHeaderProvider.create(headers)).build())
val proxySubscriberTemplate = PubSubSubscriberTemplate(proxySubscriberFactory)
val adapter = PubSubInboundChannelAdapter(proxySubscriberTemplate, mpProperties.gcp.quotationSubscription)
adapter.outputChannel = quotationsChannel
adapter.ackMode = AckMode.MANUAL
adapter.payloadType = ActivityStateChanged::class.java
return adapter
}
@Throws(IOException::class)
fun getCredentials(): GoogleCredentials {
val httpTransportFactory = getHttpTransportFactory(
"127.0.0.1", 3128, "ala", "bala"
)
return GoogleCredentials.getApplicationDefault(httpTransportFactory)
}
fun getHttpTransportFactory(
proxyHost: String?,
proxyPort: Int,
proxyUsername: String?,
proxyPassword: String?
): HttpTransportFactory? {
val proxyHostDetails = HttpHost(proxyHost, proxyPort)
val httpRoutePlanner: HttpRoutePlanner = DefaultProxyRoutePlanner(proxyHostDetails)
val credentialsProvider: CredentialsProvider = BasicCredentialsProvider()
credentialsProvider.setCredentials(
AuthScope(proxyHostDetails.hostName, proxyHostDetails.port),
UsernamePasswordCredentials(proxyUsername, proxyPassword)
)
val httpClient: HttpClient = ApacheHttpTransport.newDefaultHttpClientBuilder()
.setRoutePlanner(httpRoutePlanner)
.setProxyAuthenticationStrategy(ProxyAuthenticationStrategy.INSTANCE)
.setDefaultCredentialsProvider(credentialsProvider)
.setDefaultRequestConfig(
RequestConfig.custom()
.setAuthenticationEnabled(true)
.setProxy(proxyHostDetails)
.build())
.addInterceptorLast(HttpRequestInterceptor { request, context ->
request.addHeader(
BasicHeader(
"Proxy-Authorization",
getBasicAuth()
)
)
})
.build()
val httpTransport: HttpTransport = ApacheHttpTransport(httpClient)
return HttpTransportFactory { httpTransport }
}
还尝试使用@GRpcGlobalInterceptor
LogNet
https://github.com/LogNet/grpc-spring-boot-starter
@Bean
@GRpcGlobalInterceptor
fun globalServerInterceptor(): ServerInterceptor {
return GrpcServerInterceptor(configurationProperties)
}
@Bean
@GRpcGlobalInterceptor
fun globalClientInterceptor(): ClientInterceptor {
return GrpcClientInterceptor(configurationProperties)
}
和
class GrpcClientInterceptor(private val configurationProperties: ConfigurationProperties) :
ClientInterceptor {
private val proxyUsername = configurationProperties.http.proxy.username
private val proxyPassword = configurationProperties.http.proxy.password
private val proxyHeaderKey = Metadata.Key.of("Proxy-Authorization", Metadata.ASCII_STRING_MARSHALLER)
private fun getBasicAuth(): String {
val usernameAndPassword = "$proxyUsername:$proxyPassword"
val encoded = Base64.getEncoder().encodeToString(usernameAndPassword.toByteArray())
return "Basic $encoded"
}
override fun <ReqT, RespT> interceptCall(
method: MethodDescriptor<ReqT, RespT>?,
callOptions: CallOptions?, next: Channel
): ClientCall<ReqT, RespT>? {
return object : SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
override fun start(responseListener: Listener<RespT>?, headers: Metadata) {
headers.put(proxyHeaderKey, getBasicAuth())
super.start(object : SimpleForwardingClientCallListener<RespT>(responseListener) {
override fun onHeaders(headers: Metadata) {
super.onHeaders(headers)
}
}, headers)
}
}
}
}
class GrpcServerInterceptor(private val configurationProperties: ConfigurationProperties) :
ServerInterceptor {
private val proxyUsername = configurationProperties.http.proxy.username
private val proxyPassword = configurationProperties.http.proxy.password
override fun <ReqT : Any?, RespT : Any?> interceptCall(
call: ServerCall<ReqT, RespT>?,
headers: io.grpc.Metadata?,
next: ServerCallHandler<ReqT, RespT>?
): ServerCall.Listener<ReqT> {
val proxyHeaderKey = Metadata.Key.of("Proxy-Authorization", Metadata.ASCII_STRING_MARSHALLER)
if (!headers!!.containsKey(proxyHeaderKey))
headers!!.put(proxyHeaderKey, getBasicAuth())
return next!!.startCall(call, headers)
}
private fun getBasicAuth(): String {
val usernameAndPassword = "$proxyUsername:$proxyPassword"
val encoded = Base64.getEncoder().encodeToString(usernameAndPassword.toByteArray())
return "Basic $encoded"
}
}
(也直接在类级别上尝试了注释-ofc 它不起作用)
还尝试使用@GrpcGlobalServerInterceptor
和@GrpcGlobalClientInterceptor
来自https://github.com/yidongnan/grpc-spring-boot-starter/tree/v2.12.0.RELEASE但这种依赖完全崩溃了应用程序
解决方案
在这里,您可以找到有关如何将 Java API 文档中的代理凭据设置为configure-a-proxy的示例;
public CloudTasksClient getService() throws IOException {
TransportChannelProvider transportChannelProvider =
CloudTasksStubSettings.defaultGrpcTransportProviderBuilder()
.setChannelConfigurator(
new ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder>() {
@Override
public ManagedChannelBuilder apply(ManagedChannelBuilder managedChannelBuilder) {
return managedChannelBuilder.proxyDetector(
new ProxyDetector() {
@Nullable
@Override
public ProxiedSocketAddress proxyFor(SocketAddress socketAddress)
throws IOException {
return HttpConnectProxiedSocketAddress.newBuilder()
.setUsername(PROXY_USERNAME)
.setPassword(PROXY_PASSWORD)
.setProxyAddress(new InetSocketAddress(PROXY_HOST, PROXY_PORT))
.setTargetAddress((InetSocketAddress) socketAddress)
.build();
}
});
}
})
.build();
CloudTasksSettings cloudTasksSettings =
CloudTasksSettings.newBuilder()
.setTransportChannelProvider(transportChannelProvider)
.build();
return CloudTasksClient.create(cloudTasksSettings);
}
考虑一下它说 gRPC 代理当前处于试验阶段的注释。
推荐阅读
- c# - Program1 不能正确运行 Program2
- mysql - 用于基于三个参数查找唯一行的 SQL 查询 - 类似于“在已排序的分组集中获取第一行”
- r - 在 R 中总结并创建堆积条形图
- python - 无法在 Python 中安装 ICONV 库
- r - 在 ggplot2 中创建额外的独立图例
- activemq - 消息选择器不工作的消费者
- python - 如何使精灵平滑旋转
- c++ - 汇编语言计算错误
- android - How to create a flow which detects consecutive value increments of another flow?
- java - 排序后无法在java中的字符数组上进行二进制搜索