首页 > 解决方案 > 在长时间运行的 Cloud PubSub 订阅者服务中捕获侦听器异常

问题描述

我正在尝试用 Java 编写一个长期运行的订阅者服务。我已经设置了侦听器来侦听订阅者服务中的任何故障。我试图让这个容错,我不太了解一些事情,以下是我的疑问/问题。

  1. 我已遵循此处显示的基本设置https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/片段/SubscriberSnippets.java。具体来说,我设置了 addListener ,如下所示。

如以下代码所示,initializeSubscriber 充当一个状态变量,它将确定订阅者服务是否应该重新启动。在 while 循环内部,会持续监控这个变量以确定是否需要重新启动。

我的问题是,1. 如何在 Subscriber.Listener 的失败方法中引发异常并在主 while 循环中捕获它。我尝试在失败的方法中抛出一个新的 Exception() 并将其捕获到内部的 catch 块中,但是,我无法编译代码,因为它是一个已检查的异常。2.如图这里,我使用Java Executor线程来运行Listener。如何处理侦听器故障?我能否在此处显示的一般异常捕获块下捕获侦听器故障?

try {
 boolean initializeSubscriber = true;
    while (true) {
        try {
           if (initializeSubscriber) { 
             createSingleThreadedSubscriber();
             addErrorListenerToSubscriber();
             subscriber.startAsync().awaitRunning();
             initializeSubscriber = false;
           }

          // Checks the status of subscriber service every minute
          Thread.sleep(60000);

        } catch (Exception ex) {
          LOGGER.error("Could not start the Subscriber service", ex);
          cleanupSubscriber();
          initializeSubscriber = true;
        }
    }
} catch (RuntimeException e) {

} finally {
    shutdown();
}
private void addErrorListenerToSubscriber() {
    subscriber.addListener(
      new Subscriber.Listener() {
          @Override
          public void failed(Subscriber.State from, Throwable failure) throws RuntimeException { 
            LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
                + ",Restarting Subscriber service");
            initializeSubscriber = true; 
          }
      },
      Executors.newSingleThreadExecutor());
  }

  private void cleanupSubscriber() {

    try {
      if (subscriber != null) {
        subscriber.stopAsync().awaitTerminated();
      }
      if (!subscriptionListener.isShutdown()) {
        subscriptionListener.shutdown();
      }
    } catch (Exception ex) {
      LOGGER.error("Error in cleaning up Subscriber thread " + ex);
    }
  }

标签: google-cloud-platformgoogle-cloud-pubsub

解决方案


如果您只想在失败时重新创建订阅者,则不需要向订阅者添加侦听器。您可以改为在以下位置捕获异常awaitTerminated

try {
  boolean initializeSubscriber = true;
  while (initializeSubscriber) {
    try { 
      createSingleThreadedSubscriber();
      subscriber.startAsync().awaitRunning();
      initializeSubscriber = false;
      subscriber.awaitTerminated();
    } catch (Exception ex) {
      LOGGER.error("Error in the Subscriber service", ex);
      cleanupSubscriber();
      initializeSubscriber = true;
    }
  }
}  catch (RuntimeException e) {
} finally {
  shutdown();
}

如果订阅者因为调用 成功关闭stopAsync,则awaitTerminated不会抛出异常。如果有某种异常,那么awaitTerminated将抛出一个,IllegalStateException因为状态将是FAILED而不是TERMINATED

请注意,瞬态错误由库本身处理。例如,如果服务器由于网络中断而暂时不可用,则库将无缝重新连接并继续传递消息。导致订阅者状态更改的故障可能是永久性故障,例如权限问题(运行订阅者的帐户没有订阅订阅的权限)或资源问题(例如订阅已被删除)。在这些永久失败的情况下,重新创建订户可能只会导致相同的错误,除非采取手动步骤来干预和解决问题。


推荐阅读