首页 > 解决方案 > 如何在 RxJava 中捕获 NPE 并将自定义错误发送到 Publisher

问题描述

我正在做这里显示的内容的 rxified 版本:(https://vertx.io/docs/vertx-amqp-client/java/#_creating_a_receiver)。

基本上,只要在连接上收到消息,下面的代码就会获取消息。在幕后,我通过停止消息代理服务来模拟断开的连接。这会导致 AMQP 代码抛出 NPE。我想捕捉 NPE 并向订阅者发送更好的错误。

当 NPE 发生时,我似乎无法捕捉到错误,所以我不确定如何实现这一点。

public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
  return connection
   .rxCreateReceiver(address)
   .flatMapPublisher(receiver ->
     receiver.toObservable()
       .doFinally(receiver::rxClose)
       .onErrorReturn(e-> {
          System.out.println("I never see this message");
          return null;
       }.toFlowable(BackpressureStrategy.BUFFER)
   );
}

我尝试像这样将它包装在 try/catch 中,但它没有进入 catch 块

public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
  try {
    // same code shown above
  } catch (NullPointerException e) {
    System.out.println("I never see this message either");
    return Observable.error(new Exception("foo")).toFlowable(BackpressureStrategy.BUFFER);
  }
}

FWIW 控制台看起来像这样(注意 - 控制台从不输出任何引用我自己的类文件的内容):

SEVERE: Unhandled Exception
java.lang.NullPointerException
at io.vertx.ampq.impl.AmqpConnectionImpl.lambda$createReceiver$8(Line 251)
...

如果您打开他们的 AmqpConnectionImpl 并转到您看到的那行代码

ProtonReceiver receiver = connection.get().createReceiver(address,opts)

这会导致 NPE,因为 connection.get() 为空

标签: javarx-javarx-java2

解决方案


推荐阅读