首页 > 解决方案 > Google Pubsub Java Spring Cloud GCP 重新连接问题

问题描述

我有一个在连接到 pubsub 的 Google Kubernetes Engine 上运行的 Spring Boot 应用程序。最近我们开始看到这个问题,应用程序会在一段时间后停止接收来自订阅的消息。如果我们重新启动应用程序,它会正常连接并开始接收消息。

我已经打开了对 DEBUG 的日志记录,它显示定期断开连接并成功进行重新连接尝试,之后它继续接收消息,但在某些时候它只是默默地停止接收消息。在我的本地,我通过启动服务重新创建了场景,它接收消息,然后断开网络,重新连接网络后,上述 keepAlive/重试逻辑由于某种原因没有启动。在一种情况下,它确实在几个小时后启动,一切都很好。

所以问题是,为什么在连接丢失时重试逻辑不启动(在这个测试案例中断开并重新连接网络连接)。

当我可以加入时,我会将相同的问题发布到 Google 讨论组:https ://groups.google.com/forum/#!searchin/cloud-pubsub-discuss

它可能是一些简单的配置,但到目前为止我似乎找不到它。

我重新创建它的设置如下:

遵循本教程的准系统 Spring Boot 应用程序:https ://spring.io/guides/gs/messaging-gcp-pubsub/

使用 Spring Boot 2.0.4.RELEASE、spring-cloud-gcp-starter-pubsub:1.0.0.RELEASE 和 spring-integration-core:5.0.7.RELEASE。

我有多个订阅,因为这是我们的用例。

下面给出了成功连接的重新连接错误(但手动断开并重新连接网络时不会调用此错误):

DEBUG 59005 --- [pool-2-thread-1] cgcpvStreamingSubscriberConnection :流因可重试异常而关闭;将重新连接

io.grpc.StatusRuntimeException:不可用:服务无法满足您的请求。请再试一次。[code=8a75] at io.grpc.Status.asRuntimeException(Status.java:526) ~[grpc-core-1.13.1.jar:1.13.1] at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls .java:420) [grpc-stub-1.13.1.jar:1.13.1] 在 io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39) [grpc-core-1.13.1.jar:1.13.1]在 io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23) [grpc-core-1.13.1.jar:1.13.1] 在 io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40) [grpc- core-1.13.1.jar:1.13.1] 在 io.grpc.internal.CensusStatsModule$StatsClientInterceptor$1$1.onClose(CensusStatsModule.java:

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.integration.AckMode;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.GcpPubSubHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import com.google.cloud.pubsub.v1.AckReplyConsumer;

@SpringBootApplication
public class PubsubTestApplication {

  public static void main(String[] args) {
    SpringApplication.run(PubsubTestApplication.class, args);
  }

  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapter(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-1");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }


  @Bean
  public PubSubInboundChannelAdapter messageChannelAdapterA(@Qualifier("pubsubInputChannel") MessageChannel inputChannel,
      PubSubTemplate pubSubTemplate) {
    PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate,
        "subscription-2");
    adapter.setOutputChannel(inputChannel);
    adapter.setAckMode(AckMode.MANUAL);

    return adapter;
  }

  @Bean
  public MessageChannel pubsubInputChannel() {
    return new DirectChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "pubsubInputChannel")
  public MessageHandler messageReceiver() {
    return message -> {
      System.out.println("Message arrived! Payload: " + new String((byte[])message.getPayload()));
      AckReplyConsumer consumer =
          (AckReplyConsumer) message.getHeaders().get(GcpPubSubHeaders.ACKNOWLEDGEMENT);
      consumer.ack();
    };
  }
}

标签: javagoogle-cloud-platformpublish-subscribesubscription

解决方案


推荐阅读