google-cloud-platform - 在长时间运行的 Cloud PubSub 订阅者服务中捕获侦听器异常
问题描述
我正在尝试用 Java 编写一个长期运行的订阅者服务。我已经设置了侦听器来侦听订阅者服务中的任何故障。我试图让这个容错,我不太了解一些事情,以下是我的疑问/问题。
- 我已遵循此处显示的基本设置https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cloud/examples/pubsub/片段/SubscriberSnippets.java。具体来说,我设置了 addListener ,如下所示。
如以下代码所示,initializeSubscriber 充当一个状态变量,它将确定订阅者服务是否应该重新启动。在 while 循环内部,会持续监控这个变量以确定是否需要重新启动。
我的问题是,1. 如何在 Subscriber.Listener 的失败方法中引发异常并在主 while 循环中捕获它。我尝试在失败的方法中抛出一个新的 Exception() 并将其捕获到内部的 catch 块中,但是,我无法编译代码,因为它是一个已检查的异常。2.如图这里,我使用Java Executor线程来运行Listener。如何处理侦听器故障?我能否在此处显示的一般异常捕获块下捕获侦听器故障?
try {
boolean initializeSubscriber = true;
while (true) {
try {
if (initializeSubscriber) {
createSingleThreadedSubscriber();
addErrorListenerToSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
}
// Checks the status of subscriber service every minute
Thread.sleep(60000);
} catch (Exception ex) {
LOGGER.error("Could not start the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
private void addErrorListenerToSubscriber() {
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) throws RuntimeException {
LOGGER.info("Subscriber reached a failed state due to " + failure.getMessage()
+ ",Restarting Subscriber service");
initializeSubscriber = true;
}
},
Executors.newSingleThreadExecutor());
}
private void cleanupSubscriber() {
try {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
if (!subscriptionListener.isShutdown()) {
subscriptionListener.shutdown();
}
} catch (Exception ex) {
LOGGER.error("Error in cleaning up Subscriber thread " + ex);
}
}
解决方案
如果您只想在失败时重新创建订阅者,则不需要向订阅者添加侦听器。您可以改为在以下位置捕获异常awaitTerminated
:
try {
boolean initializeSubscriber = true;
while (initializeSubscriber) {
try {
createSingleThreadedSubscriber();
subscriber.startAsync().awaitRunning();
initializeSubscriber = false;
subscriber.awaitTerminated();
} catch (Exception ex) {
LOGGER.error("Error in the Subscriber service", ex);
cleanupSubscriber();
initializeSubscriber = true;
}
}
} catch (RuntimeException e) {
} finally {
shutdown();
}
如果订阅者因为调用 成功关闭stopAsync
,则awaitTerminated
不会抛出异常。如果有某种异常,那么awaitTerminated
将抛出一个,IllegalStateException
因为状态将是FAILED
而不是TERMINATED
。
请注意,瞬态错误由库本身处理。例如,如果服务器由于网络中断而暂时不可用,则库将无缝重新连接并继续传递消息。导致订阅者状态更改的故障可能是永久性故障,例如权限问题(运行订阅者的帐户没有订阅订阅的权限)或资源问题(例如订阅已被删除)。在这些永久失败的情况下,重新创建订户可能只会导致相同的错误,除非采取手动步骤来干预和解决问题。
推荐阅读
- android - Kotlin 获取所选选项的 ID
- python-3.x - 找不到满足 pywin32>=223 要求的版本(来自 pypiwin32)(来自版本:)
- flutter - 如果 GestureDetector 包含 ScrollablePositionedList,则不会触发 onTap 和 onScaleStart
- flutter - Navigator popUntil 后刷新页面
- gradle - 如何定义 gradle 任务依赖关系 - 输入输出或依赖?
- python - 如何使用 python elasticsearch 库将 XML 批量数据加载到 elasticsearch 中?
- php - 我必须如何在 Elasticsearch 中保存类别过滤器?
- wordpress - 将自定义字段添加到自定义 Woocommerce 电子邮件
- git - Git红色和绿色状态图标在文件夹中消失
- python - 在 JupyterLab 上查找输出历史记录