java - Project Reactor 的 Flux 在处理错误期间获取失败项目
问题描述
我需要处理Flux
流错误,为此我需要知道究竟是什么项目失败了。似乎方法doOnError
应该适合处理错误,但是我只能得到异常而不是失败的项目。有没有办法同时获得失败的项目和异常?
private void testFluxIterableFlow() {
Flux.fromIterable(Arrays.asList(1, 2, 3, 4, 5))
.map(this::process)
.doOnError(ex -> {
...
})
.doOnNext(processedValue -> ...)
.subscribe();
}
private String process(Integer value) {
if (value == 4) {
throw new RuntimeException("error...");
}
return "processed " + value;
}
在这个例子中,我需要4
在错误处理程序中接收失败的项目和异常消息。
解决方案
这里有点晚了,但以防其他人需要它......您可能想要做的是使用 onErrorContinue。请注意,此方法将从通量中删除错误项并继续处理,因此请务必牢记这一点。如果你想要错误+项目并停止一切,我相信如果你从这个方法重新抛出异常,并且没有其他下游 onErrorContinue,它应该终止通量。这是我使用 kafka 反应式 api 的示例
此外,传递给 onErrorContinue 的项目将不是通量中的原始项目,而是试图在反应链中的步骤处理的对象。例如,如果链在尝试转换 B -> C 时失败,但在转换 A -> B 之后,您将在处理程序中获得的对象将是 B。
@EventListener(ApplicationReadyEvent.class)
public void process() {
receiver.receiveAutoAck()
.concatMap(Function.identity())
.flatMap(this::deserialzeRecord)
.flatMap(this::processEvent)
.flatMap(responseSender::send)
.onErrorContinue(this::handleRecordError)
.subscribe();
}
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
}
如果您想终止通量并放弃其余项目,只需执行
private void handleRecordError(Throwable throwable, Object record) {
log.error("Received error processing record={}", record, throwable);
throw throwable;
}
推荐阅读
- angular - 如何解决我从 NPM 安装中遇到的错误问题
- jestjs - 在 Jest 中禁用“您的测试套件必须包含至少一个测试”规则
- angular - 动态创建和读取全局变量
- html - 垂直对齐元素,上方 1/3 空间下方 2/3 空间
- azure - 条件语句不适用于 vm 扩展
- image - 如何使用 imagemagick 从跨越两页的一系列 jpeg 图像创建常规的单页 pdf?
- google-maps-api-3 - Google Geolocation API 在浏览器控制台中返回 403 错误
- python - 无法更新python字典中的输出
- c# - 如何在 10 秒内将 200 万行插入 sqlserver 中的两个表
- laravel - 如何在 laravel 中同时启用 api 和 web 守卫