java - Google Pubsub Java Spring Cloud GCP 重新连接问题
问题描述
我有一个在连接到 pubsub 的 Google Kubernetes Engine 上运行的 Spring Boot 应用程序。最近我们开始看到这个问题,应用程序会在一段时间后停止接收来自订阅的消息。如果我们重新启动应用程序,它会正常连接并开始接收消息。
我已经打开了对 DEBUG 的日志记录,它显示定期断开连接并成功进行重新连接尝试,之后它继续接收消息,但在某些时候它只是默默地停止接收消息。在我的本地,我通过启动服务重新创建了场景,它接收消息,然后断开网络,重新连接网络后,上述 keepAlive/重试逻辑由于某种原因没有启动。在一种情况下,它确实在几个小时后启动,一切都很好。
所以问题是,为什么在连接丢失时重试逻辑不启动(在这个测试案例中断开并重新连接网络连接)。
当我可以加入时,我会将相同的问题发布到 Google 讨论组:https ://groups.google.com/forum/#!searchin/cloud-pubsub-discuss
它可能是一些简单的配置,但到目前为止我似乎找不到它。
我重新创建它的设置如下:
遵循本教程的准系统 Spring Boot 应用程序:https ://spring.io/guides/gs/messaging-gcp-pubsub/
使用 Spring Boot 2.0.4.RELEASE、spring-cloud-gcp-starter-pubsub:1.0.0.RELEASE 和 spring-integration-core:5.0.7.RELEASE。
我有多个订阅,因为这是我们的用例。
下面给出了成功连接的重新连接错误(但手动断开并重新连接网络时不会调用此错误):
DEBUG 59005 --- [pool-2-thread-1] cgcpvStreamingSubscriberConnection :流因可重试异常而关闭;将重新连接
io.grpc.StatusRuntimeException:不可用:服务无法满足您的请求。请再试一次。[code=8a75] at io.grpc.Status.asRuntimeException(Status.java:526) ~[grpc-core-1.13.1.jar:1.13.1] at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls .java:420) [grpc-stub-1.13.1.jar:1.13.1] 在 io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar:1.13.1]在 io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar:1.13.1] 在 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [grpc- core-1.13.1.jar:1.13.1] 在 io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
@SpringBootApplication
public class PubsubTestApplication {
public static void main(String[] args) {
SpringApplication.run(PubsubTestApplication.class, args);
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
"subscription-1");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public PubSubInboundChannelAdapter messageChannelAdapterA(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
"subscription-2");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
System.out.println("Message arrived! Payload: " + new String((byte[])message.getPayload()));
AckReplyConsumer consumer =
(AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
consumer.ack();
};
}
}
解决方案
推荐阅读
- php - 如何将 str_replace 与搜索数组一起使用并将其替换为找到的搜索数组的一部分?
- python - Python 导入 URL 表数据
- python - 如何使用用户输入附加两个字典,但第二个字典应该是值,第一个字典应该是键
- reactjs - 无法构建反应顺风
- r - 如何在本机管道中包含二进制算术运算符
- vim - 如何在`:global`中的`norm`之后使用另一个命令?
- php - CBC 模式下的 AES(块大小 128 位),在 PHP 8 中使用 PKCS#5 填充
- python - Python:使用 seaborn 的消费热图
- angular - 如何以角度从另一个组件调用函数?
- reactjs - Redux Toolkit:state.filter 未更改 state 而 state.map 更改