首页 > 解决方案 > 单声道超时时的“操作员称为默认 onErrorDropped”

问题描述

在我的生产代码中,当 Mono 超时时,我的日志中出现错误。
我已设法使用以下代码重新创建这些错误:

@Test
public void testScheduler() {
    Mono<String> callableMethod1 = callableMethod();
    callableMethod1.block();

    Mono<String> callableMethod2 = callableMethod();
    callableMethod2.block();
}

private Mono<String> callableMethod() {
    return Mono.fromCallable(() -> {
        Thread.sleep(60);
        return "Success";
    })
            .subscribeOn(Schedulers.elastic())
            .timeout(Duration.ofMillis(50))
            .onErrorResume(throwable -> Mono.just("Timeout"));
}

Mono.fromCallable我正在使用第三方库进行阻塞调用。当此调用超时时,我会收到类似的错误

reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.publisher.Operators - Scheduler worker in group main failed with an uncaught exception

这些错误似乎也是间歇性的,有时当我运行代码时我完全没有错误。但是,当我以 10 次的循环重复呼叫时,我一直得到它们。

标签: project-reactor

解决方案


问题:为什么会发生此错误?

答案

当给timeout()运算符的持续时间已经过去时,它会抛出TimeoutException。这导致以下结果:

  1. 一个onError信号被发送到主反应链。结果,主执行被恢复并且进程继续前进(即,onErrorResume()被执行)。

  2. 在结果 #1 之后不久,在fromCallable()中定义的异步任务被中断,这触发了第二个异常(InterruptedException)。主反应链不能再处理这个InterruptedException因为TimeoutException首先发生并且已经导致主反应链恢复(注意:这种不生成第二个onError信号的行为符合反应流规范 -> Publisher #7)。

由于主链无法优雅地处理第二个异常(InterruptedException ),Reactor 将其记录在错误级别以让我们知道发生了意外异常。

问题:我如何摆脱它们?

简短答案:使用Hooks.onErrorDropped()更改日志级别:

    Logger logger = Logger.getLogger(this.getClass().getName());
    @Test
    public void test() {
        Hooks.onErrorDropped(error -> {
            logger.log(Level.WARNING, "Exception happened:", error);
        });

        Mono.fromCallable(() -> {
            Thread.sleep(60);
            return "Success";
        })
         .subscribeOn(Schedulers.elastic())
         .timeout(Duration.ofMillis(50))
         .onErrorResume(throwable -> Mono.just("Timeout"))
         .doOnSuccess(result -> logger.info("Result: " + result))
         .block();
     }

长答案:如果您的用例允许,您可以处理fromCallable()中发生的异常,以便影响主链的唯一异常是TimeoutException。在这种情况下,onErrorDropped()不会首先发生。

@Test
public void test() {
    Mono.fromCallable(() -> {
        try {
            Thread.sleep(60);
        } catch (InterruptedException ex) {
            //release resources, rollback actions, etc
            logger.log(Level.WARNING, "Something went wrong...", ex);
        }
        return "Success";
    })
     .subscribeOn(Schedulers.elastic())
     .timeout(Duration.ofMillis(50))
     .onErrorResume(throwable -> Mono.just("Timeout"))
     .doOnSuccess(result -> logger.info("Result: " + result))
     .block();
}

额外参考:


推荐阅读