首页 > 解决方案 > 如何在通量流中获得实际价值,而不是 FluxLift?

问题描述

我得到了一个Publisher<DataBuffer> inputStream论点。我想将该流转换为字符串,记录该字符串,然后我必须将该字符串再次作为 aPublisher传递给另一个方法。

例子:

public Flux<Object> decode(Publisher<DataBuffer> inputStream) {
    return DataBufferUtils.join(inputStream)
                .map(buffer -> StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString())
                .doOnNext(arg -> LOGGER.info(arg))
                .map(arg -> library.delegate(Mono.fromSupplier(() -> arg))) 
                .flatMapIterable(arg -> {
                     System.out.println(arg); //instanceof FluxLift??
                     return List.of(arg);
                );
}

class ExternalLibrary {
    //this ALWAYS returns a FluxLift
    public Flux<Object> delegate(Publisher<String> inputString) {
        //does not matter, I don't have control of this.
        //lets assume it does the following:

        return Flux.just(input).
            flatMapIterable(buffer -> List.of("some", "result"))
            .map(arg -> arg);
    }
}

问题:为什么最后的参数flatMapInterable()总是 type FluxLift?而且:如何才能在这里返回真正的价值?

标签: springspring-webfluxreactor

解决方案


为什么最终 flatMapInterable() 中的参数总是 FluxLift 类型?

因为你的 map 函数返回 Flux

.map(arg -> library.delegate(Mono.fromSupplier(() -> arg)))

怎么能在这里返回真正的价值?

当映射函数返回反应类型时,使用其中一个flatMap*函数而不是map. flatMapMany适合您的情况:

public Flux<Object> decode(Publisher<DataBuffer> inputStream) {
    return DataBufferUtils.join(inputStream) //Mono<DataBuffer>
                .map(buffer -> StandardCharsets.UTF_8.decode(buffer.asByteBuffer()).toString()) //Mono<String>
                .doOnNext(arg -> LOGGER.info(arg)) //Mono<String>
                .flatMapMany(arg -> library.delegate(Mono.fromSupplier(() -> arg))) // Flux<Object>
                .flatMapIterable(arg -> {
                     System.out.println(arg); // instanceof Object
                     return List.of(arg);
                );

推荐阅读