首页 > 解决方案 > 从 Flux 中收集所有错误

问题描述

我有一个Flux<Message>, 和一个Mono<Void> process(Message m)函数。该process函数可以返回错误或不返回任何内容。我想process并行调用该函数,并将所有有进程错误的 messageId 收集到List<String>.

@Data
class Message {
    String messageId;
    String content;    
}

Flux.just(new Message("A", "1"), new Message("B", "2"))
    .flatMap(m -> 
        process(m).onErrorReturn(m.messageId)
    )

但问题是我不能返回方法以外的Void类型onErrorXXX

标签: javareactive-programmingspring-webfluxproject-reactorreactor

解决方案


您可以做的是拥有一个包含 messageId 和错误消息的包装器,如下所示:

@Value
private static class ProcessingError {
    private final String id;
    private final String message;
    private final boolean hasError;
}

Flux.just(new Message("A", "1"), new Message("B", "2"))
    .flatMap(m -> process(m).thenReturn(new ProcessingError(m.messageId, null, false))
                            .onErrorResume(th -> Mono.just(new ProcessingError(m.messageId, th.getMessage(), true))));

现在之后的通量flatMap是 a Flux<ProcessingError>,你可以只过滤一个,hasError = true然后收集你想要的方式。


推荐阅读