java - 如何在 RxJava 中捕获 NPE 并将自定义错误发送到 Publisher
问题描述
我正在做这里显示的内容的 rxified 版本:(https://vertx.io/docs/vertx-amqp-client/java/#_creating_a_receiver)。
基本上,只要在连接上收到消息,下面的代码就会获取消息。在幕后,我通过停止消息代理服务来模拟断开的连接。这会导致 AMQP 代码抛出 NPE。我想捕捉 NPE 并向订阅者发送更好的错误。
当 NPE 发生时,我似乎无法捕捉到错误,所以我不确定如何实现这一点。
public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
return connection
.rxCreateReceiver(address)
.flatMapPublisher(receiver ->
receiver.toObservable()
.doFinally(receiver::rxClose)
.onErrorReturn(e-> {
System.out.println("I never see this message");
return null;
}.toFlowable(BackpressureStrategy.BUFFER)
);
}
我尝试像这样将它包装在 try/catch 中,但它没有进入 catch 块
public Publisher<Object> receiveAmqpMessages(AmqpConnection connection, String address) {
try {
// same code shown above
} catch (NullPointerException e) {
System.out.println("I never see this message either");
return Observable.error(new Exception("foo")).toFlowable(BackpressureStrategy.BUFFER);
}
}
FWIW 控制台看起来像这样(注意 - 控制台从不输出任何引用我自己的类文件的内容):
SEVERE: Unhandled Exception
java.lang.NullPointerException
at io.vertx.ampq.impl.AmqpConnectionImpl.lambda$createReceiver$8(Line 251)
...
如果您打开他们的 AmqpConnectionImpl 并转到您看到的那行代码
ProtonReceiver receiver = connection.get().createReceiver(address,opts)
这会导致 NPE,因为 connection.get() 为空
解决方案
推荐阅读
- ios - 使用 `localTimeZone` 设置 NSCalendar 的时区与使用 `timeZoneWithAbbreviation` 给出不同值的区别
- sql - MySQL用之前的字段减去字段
- java - 未触发 Firebase 异步上传到 Cloud Firestore
- django - 列出反向外键的第一个条目
- javascript - 应用程序/javascript 或文本/javascript
- docker-compose - 如何从主机连接到数据库 - 在docker compose下运行的容器中的sql server
- unity3d - Admob 横幅位置错误(Unity3d)
- python - Python multiprocessing.Manager().Queue():尝试加入进程时出现死锁
- r - 绘制温度计图
- intellij-idea - 如何在 IntelliJ 中设置“-Xuse-experimental=kotlin.experimental”