首页 > 解决方案 > 为什么 Flux.flatMap() 不等待内部发布者完成?

问题描述

您能否解释一下在返回的 Flux/Mono 中究竟发生了什么HttpClient.response() ?我认为在 Mono 完成之前,http 客户端生成的值不会传递到下游,但我看到生成了大量请求,最终出现reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8异常。如果我testRequest()Mono.fromCallable { }.

我错过了什么?

测试代码:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext { (obj, res) ->
                    println("Created request for: $obj ${res.length} characters")
                }
                .collectList().block()!!
    }

    fun testRequest(): Mono<String> {
        return client.get()
                .uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
                .responseContent()
                .reduce(StringBuilder(), { sb, buf ->
                    val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
                    sb.append(str)
                })
                .map { it.toString() }
    }
}

标签: kotlinproject-reactorreactive-streamsreactor-netty

解决方案


当您创建ConnectionProvider这样的时ConnectionProvider.create("meh", 4),这意味着最大连接数为 4 和最大挂起请求数为 8 的连接池。有关此内容的更多信息,请参见此处

当您使用flatMap此方法时,Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave请参阅此处了解更多信息。

所以发生的事情是你试图同时运行所有请求。

所以你有两个选择:

  • 如果要使用flatMap,请增加待处理请求的数量。
  • 如果您想保留未决请求的数量,您可以考虑例如使用concatMap而不是flatMap,这意味着Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation在这里查看更多关于此的信息。

推荐阅读