project-reactor - 单声道超时时的“操作员称为默认 onErrorDropped”
问题描述
在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重新创建这些错误:
@Test
public void testScheduler() {
Mono<String> callableMethod1 = callableMethod();
callableMethod1.block();
Mono<String> callableMethod2 = callableMethod();
callableMethod2.block();
}
private Mono<String> callableMethod() {
return Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"));
}
在Mono.fromCallable
我正在使用第三方库进行阻塞调用。当此调用超时时,我会收到类似的错误
reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception
这些错误似乎也是间歇性的,有时当我运行代码时我完全没有错误。但是,当我以 10 次的循环重复呼叫时,我一直得到它们。
解决方案
问题:为什么会发生此错误?
答案:
当给timeout()运算符的持续时间已经过去时,它会抛出TimeoutException。这导致以下结果:
一个onError信号被发送到主反应链。结果,主执行被恢复并且进程继续前进(即,onErrorResume()被执行)。
在结果 #1 之后不久,在fromCallable()中定义的异步任务被中断,这触发了第二个异常(InterruptedException)。主反应链不能再处理这个InterruptedException因为TimeoutException首先发生并且已经导致主反应链恢复(注意:这种不生成第二个onError信号的行为符合反应流规范 -> Publisher #7)。
由于主链无法优雅地处理第二个异常(InterruptedException ),Reactor 将其记录在错误级别以让我们知道发生了意外异常。
问题:我如何摆脱它们?
简短答案:使用Hooks.onErrorDropped()更改日志级别:
Logger logger = Logger.getLogger(this.getClass().getName());
@Test
public void test() {
Hooks.onErrorDropped(error -> {
logger.log(Level.WARNING, "Exception happened:", error);
});
Mono.fromCallable(() -> {
Thread.sleep(60);
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
长答案:如果您的用例允许,您可以处理fromCallable()中发生的异常,以便影响主链的唯一异常是TimeoutException。在这种情况下,onErrorDropped()不会首先发生。
@Test
public void test() {
Mono.fromCallable(() -> {
try {
Thread.sleep(60);
} catch (InterruptedException ex) {
//release resources, rollback actions, etc
logger.log(Level.WARNING, "Something went wrong...", ex);
}
return "Success";
})
.subscribeOn(Schedulers.elastic())
.timeout(Duration.ofMillis(50))
.onErrorResume(throwable -> Mono.just("Timeout"))
.doOnSuccess(result -> logger.info("Result: " + result))
.block();
}
额外参考:
推荐阅读
- html - 重叠的圆圈
- php - 如何输出符合 PHP 文本语句的 LaTeX 代码
- android - 将 gRPC 用于 Android 应用程序 - 示例应用程序
- html - 使用局部视图从视图生成 HTML 文件
- javascript - Jitsi meet external api 如何正确配置仅音频和自动加入?
- shell - 如何删除shell中匹配的行和前两行?
- apache-spark - 在条件内计算 MAX 值时出错
- node.js - Kubernetes - AKS:将 AKS 群集与应用程序网关链接。多个站点(不是子页面)指向同一个 IP
- java - Java SocketException:套接字已关闭
- javascript - 新窗口上的本地存储