首页 > 解决方案 > 设置 pub-sub 库 http 客户端的超时时间

问题描述

我正在尝试在本地测试一项服务,该服务在启动时会在 Google Cloud 中创建一些 Pub-Sub 主题。我正在使用我的个人凭据连接到 Google 的服务(我没有使用 pub-sub 模拟器)。但是,似乎用户凭证对允许他们使用的 API 的使用有很大的限制,而且看起来他们受到了很多限制。我遇到的问题是,在发送创建主题的请求后,应用程序挂起大约 10 分钟,并且无法停止(通过在终端中按 Ctrl+C),我必须将其杀死。

应用程序启动时弹出的警告是:

Your application has authenticated using end user credentials from Google Cloud SDK. We recommend that most server applications use service accounts instead. If your application continues to use end user credentials from Cloud SDK, you might receive a "quota exceeded" or "API not enabled" error. For more information about service accounts, see https://cloud.google.com/docs/authentication/.

应用运行 10 分钟后,我收到以下错误,表明 http 客户端超时时间很长,Google 的 API 响应时间很长:

Error creating PubSub topic: com.google.api.gax.rpc.UnavailableException: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
        at com.google.api.gax.rpc.ApiExceptionFactory.createException(ApiExceptionFactory.java:69)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:72)
        at com.google.api.gax.grpc.GrpcApiExceptionFactory.create(GrpcApiExceptionFactory.java:60)
        at com.google.api.gax.grpc.GrpcExceptionCallable$ExceptionTransformingFuture.onFailure(GrpcExceptionCallable.java:97)
        at com.google.api.core.ApiFutures$1.onFailure(ApiFutures.java:68)
        at com.google.common.util.concurrent.Futures$CallbackListener.run(Futures.java:1083)
        at com.google.common.util.concurrent.DirectExecutor.execute(DirectExecutor.java:30)
        at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:1174)
        at com.google.common.util.concurrent.AbstractFuture.complete(AbstractFuture.java:969)
        at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:760)
        at io.grpc.stub.ClientCalls$GrpcFuture.setException(ClientCalls.java:563)
        at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:533)
        at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:413)
        at io.grpc.internal.ClientCallImpl.access$500(ClientCallImpl.java:66)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
        at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:721)
        at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
        at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)
        Suppressed: com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
                at com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
                at com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
                at com.google.cloud.pubsub.v1.TopicAdminClient.createTopic(TopicAdminClient.java:284)
                at com.google.cloud.pubsub.v1.TopicAdminClient.createTopic(TopicAdminClient.java:206)
                at com.myapp.integrations.common.pubsub.KenectPubSub.createPubSubTopic(KenectPubSub.java:104)
                at com.myapp.integrations.hub.config.BeanConfiguration_ProducerMethod_getPubSub_8c1027f866ef011e10384d59fcdcf03ffcde3048_ClientProxy.createPubSubTopic(BeanConfiguration_ProducerMethod_getPubSub_8c1027f866ef011e10384d59fcdcf03ffcde3048_ClientProxy.zig:358)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.createNewTopic(IntegrationsRouterImpl.java:166)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.lambda$init$0(IntegrationsRouterImpl.java:89)
                at java.base/java.lang.Iterable.forEach(Iterable.java:75)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl.init(IntegrationsRouterImpl.java:82)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Bean.create(IntegrationsRouterImpl_Bean.zig:242)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Bean.create(IntegrationsRouterImpl_Bean.zig:258)
                at io.quarkus.arc.impl.AbstractSharedContext.createInstanceHandle(AbstractSharedContext.java:96)
                at io.quarkus.arc.impl.AbstractSharedContext.access$000(AbstractSharedContext.java:14)
                at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:29)
                at io.quarkus.arc.impl.AbstractSharedContext$1.get(AbstractSharedContext.java:26)
                at io.quarkus.arc.impl.LazyValue.get(LazyValue.java:26)
                at io.quarkus.arc.impl.ComputingCache.computeIfAbsent(ComputingCache.java:69)
                at io.quarkus.arc.impl.AbstractSharedContext.get(AbstractSharedContext.java:26)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_ClientProxy.arc$delegate(IntegrationsRouterImpl_ClientProxy.zig:92)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_ClientProxy.arc_contextualInstance(IntegrationsRouterImpl_ClientProxy.zig:110)
                at com.myapp.integrations.hub.router.IntegrationsRouterImpl_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.notify(IntegrationsRouterImpl_Observer_Synthetic_d70cd75bf32ab6598217b9a64a8473d65e248c05.zig:94)
                at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:282)
                at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:267)
                at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:69)
                at io.quarkus.arc.runtime.LifecycleEventRunner.fireStartupEvent(LifecycleEventRunner.java:23)
                at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:60)
                at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy_0(LifecycleEventsBuildStep$startupEvent-858218658.zig:81)
                at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent-858218658.deploy(LifecycleEventsBuildStep$startupEvent-858218658.zig:40)
                at io.quarkus.runner.ApplicationImpl.doStart(ApplicationImpl.zig:763)
                at io.quarkus.runtime.Application.start(Application.java:90)
                at io.quarkus.runtime.ApplicationLifecycleManager.run(ApplicationLifecycleManager.java:95)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:62)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:38)
                at io.quarkus.runtime.Quarkus.run(Quarkus.java:104)
                at com.myapp.integrations.hub.Main.main(Main.java:9)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:564)
                at io.quarkus.runner.bootstrap.StartupActionImpl$3.run(StartupActionImpl.java:134)
                ... 1 more
Caused by: io.grpc.StatusRuntimeException: UNAVAILABLE: Credentials failed to obtain metadata
        at io.grpc.Status.asRuntimeException(Status.java:533)
        ... 13 more
Caused by: com.google.api.client.http.HttpResponseException: 400 Bad Request
{
  "error": "invalid_grant",
  "error_description": "Bad Request"
}
        at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113)
        at com.google.auth.oauth2.UserCredentials.refreshAccessToken(UserCredentials.java:203)
        at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:157)
        at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:145)
        at com.google.auth.oauth2.UserCredentials.getRequestMetadata(UserCredentials.java:281)
        at com.google.auth.Credentials.blockingGetToCallback(Credentials.java:112)
        at com.google.auth.Credentials$1.run(Credentials.java:98)
        ... 6 more

我检查了文档,并没有说明如何为库的 http 客户端自定义超时。有人知道该怎么做吗?我正在使用 google-cloud-pubsub:1.108.3

请注意,这个问题是关于能够在 http 客户端中设置超时。我知道使用服务帐户的凭据可以解决问题,但我很想知道库是否提供更改 http 客户端设置的选项。

标签: javagoogle-cloud-pubsub

解决方案


我没有看到 Cloud Pub/Sub 文档 [1,2] 中任何地方讨论的超时设置。但是,您可以在 java utils 中手动调整 TimeoutException。这样,您将能够捕获超时异常。请参阅 [3] 以了解如何将超时异常与 awaitTermination 一起使用。

awaitTermination 方法用于“在关闭请求后阻塞,直到所有工作完成执行,或者发生超时,或者当前线程被中断,以先发生者为准”[4]。请注意,这通常用于在使用异步订阅等待消息时延长超时。

为了在 Java 中配置单线程超时,还有其他选项,但这些选项在 PubSub [5] 上并不常用。此外,正如您所说,如果这些凭据配置不正确,即使您延长超时,请求也不会成功。

[1] - https://cloud.google.com/pubsub/docs/

[2] - https://googleapis.dev/java/google-cloud-pubsub/1.108.3/index.html

[3] - https://cloud.google.com/pubsub/docs/quickstart-client-libraries#receive_messages

[4] - https://googleapis.dev/java/gax/latest/com/google/api/gax/core/BackgroundResource.html#awaitTermination-long-java.util.concurrent.TimeUnit-

[5] - java中的简单超时


推荐阅读