首页 > 解决方案 > Reactive Java OneError 恢复错误处理

问题描述

尝试保存事件有这个流程(repo 是响应式的,这只是测试的示例代码。我是新的响应式,我正在使用 io.projectreactor (3.3))

  1. 验证事件,失败时写入历史
  2. 如果验证成功,将事件写入 repo,任何失败写入历史
  3. 如果验证失败写入历史
  4. 诱导一些失败来模拟错误情况
import reactor.core.publisher.Mono;

public class MyTest {

    static int counter = 0;


    public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                return Mono.empty();})
                .subscribe();
    }

    private Mono<String> onMessage(String input)
    {
        return Mono.create(sink -> {
            validate()
                    .onErrorResume(e -> {
                        System.out.println("error onMessage:: fail to validate");
                        sink.error(e);
                        return Mono.error(e);
                    })
                    .flatMap(a -> processObject(input))
                    .flatMap(h -> {
                        System.out.println("success onMessage :: save history");
                        new Service().saveHistory(input, false);
                        sink.success();
                        return Mono.just(h);
                      })
                     .subscribe();
        });

    }

    private Mono<String> processObject(String input)
    {
           return Mono.create(sink -> {
               new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   sink.success(a);
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                   sink.error(e);
                   return Mono.error(e);
               }).subscribe();
           });

    }

    private Mono<String> validate()
    {
        counter++;
        return Mono.create(sink -> {
            if (counter % 3 == 0)
            {
                sink.error(new RuntimeException("Validate method error"));
                return;
            }
            sink.success("validate is done ");
            return;
        });

    }


}

服务等级

public class Service {


    public Mono<String> saveEvent(String id)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save event");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save event"));

    }

    public Mono<String> saveHistory(String id, boolean error)
    {
        return save(id)
                .onErrorResume(e -> {
                    System.out.println("Error in save history");
                    return Mono.error(e);
                }).doOnNext(e -> System.out.println("save history"));

    }

    public Mono<String> save(String id)
    {

        if (id  == null)
        {
            throw new RuntimeException("Error saving");
        }

        return Mono.just("save success");
    }

}

我得到了这个例外

---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: Validate method error
Caused by: java.lang.RuntimeException: Validate method error
    at sample.MyTest.lambda$validate$9(MyTest.java:77)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.lambda$onMessage$5(MyTest.java:49)
    at reactor.core.publisher.MonoCreate.subscribe(MonoCreate.java:57)
    at reactor.core.publisher.Mono.subscribe(Mono.java:4110)
    at reactor.core.publisher.Mono.subscribeWith(Mono.java:4216)
    at reactor.core.publisher.Mono.subscribe(Mono.java:3942)
    at sample.MyTest.createMessage(MyTest.java:30)
    at sample.MyTest.main(MyTest.java:18)

更新的工作代码:基于@Michael Berry 评论

 public static void main(String args[]) throws InterruptedException
    {
        String array[] = {"1","2","3","4",null,"5"};
        for(int i =0; i < 5; i++)
        {
            System.out.println("input:: "+array[i]);
            new MyTest().createMessage(array[i]);
            counter++;
            Thread.sleep(500);
        }
    }
    private void createMessage(String input)
    {
        new MyTest().onMessage(input)
                .doOnSuccess(s -> System.out.println("----done::success-----"))
                .onErrorResume(e ->
                {
                    System.out.println("---done::error --creatMessage::doOnError:: caused by "+e);
                   return Mono.empty();
                })
                .subscribe();
    }

    private Mono<String> onMessage(String input) {
        return validate()
                .onErrorResume(e -> {
                    System.out.println("error onMessage:: fail to validate");
                    return Mono.error(e);
                })
                .flatMap(a -> processObject(input))
                .flatMap(h -> {
                    System.out.println("success onMessage :: save history");
                    new Service().saveHistory(input, false);
                    return Mono.just(h);
                });
    }

    private Mono<String> processObject(String input)
    {
           return new Service().saveEvent(input).flatMap(a -> {
                   System.out.println("success processObject");
                   return Mono.just(a);
               }).onErrorResume(e -> {
                   new Service().saveHistory(input, true);
                   System.out.println("error processObject");
                    return Mono.error(e);
               });

    }

    private Mono<String> validate()
    {
        counter++;

            if (counter % 3 == 0)
            {
                return Mono.error(new RuntimeException("Validate method error"));

            }
          return  Mono.just("validate is done ");



    }

结果

save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error

标签: project-reactor

解决方案


由于您的onMessage()实现,您在这里遇到错误,这有点奇怪:

  • 您正在包装 a Monoin Mono.create(),没有理由这样做;
  • 您自己订阅了这个内部发布者 - 这几乎总是错误的做法,并且不一定会按照您的预期进行(订阅发布者应该由框架处理,而不是您的代码。)在这种情况下,关键事情是它意味着它是单独处理的,而不是你的反应链的一部分,所以你的错误处理可能没有像你期望的那样映射到内部发布者;
  • onErrorResume()对这个内部发布者本身的调用会返回一个错误,并且在这个内部发布者上没有其他错误处理 - 因此为什么该错误未处理,因此它会打印出您看到的堆栈跟踪。

相反,您很可能希望您的onMessage()方法这样读取:

private Mono<String> onMessage(String input) {
    return validate()
            .onErrorResume(e -> {
                System.out.println("error onMessage:: fail to validate");
                return Mono.error(e);
            })
            .flatMap(a -> processObject(input))
            .flatMap(h -> {
                System.out.println("success onMessage :: save history");
                new Service().saveHistory(input, false);
                return Mono.just(h);
            });
}

...没有Mono.create()(仅出于兼容性目的,非反应器回调 API 才真正使用。)您的输出与此更改然后如下所示:

input:: 1
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 2
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate     method error
input:: 3
save event
success processObject
success onMessage :: save history
----done::success-----
input:: 4
save event
success processObject
success onMessage :: save history
----done::success-----
input:: null
error onMessage:: fail to validate
---done::error --creatMessage::doOnError:: caused by java.lang.RuntimeException: Validate method error

推荐阅读