spring - 背压在 Project Reactor 中是如何工作的?
问题描述
我一直在 Spring Reactor 中工作,并且之前进行了一些测试,这让我想知道 Fluxes 在默认情况下如何处理背压。我知道存在 onBackpressureBuffer 等,并且我还读过RxJava 默认为无界,直到您定义是否缓冲、丢弃等。
那么,谁能为我澄清一下:Reactor 3 中 Flux 的默认背压行为是什么?
我尝试搜索答案,但没有找到任何明确的答案,只有 Backpressure 的定义或上面链接的 RxJava 答案
解决方案
什么是背压?
背压或消费者向生产者发出发射率太高的信号的能力 -反应堆参考
当我们谈论背压时,我们必须将来源/发布者分为两组:尊重订阅者需求的一组,以及忽略它的一组。
通常热门来源不尊重订阅者的需求,因为它们经常产生实时数据,例如收听 Twitter 提要。在此示例中,订阅者无法控制创建推文的速度,因此很容易不知所措。
另一方面,冷源通常在订阅发生时按需生成数据,例如发出 HTTP 请求然后处理响应。在这种情况下,您调用的 HTTP 服务器只会在您发送请求后发送响应。
重要的是要注意这不是一条规则:不是每个热源都忽略需求,也不是每个冷源都尊重它。您可以在此处阅读有关冷热源的更多信息。
让我们看一些可能有助于理解的例子。
尊重需求的出版商
Integer.MAX_VALUE
给定一个Flux 产生从 1 到
Flux.range(1, Integer.MAX_VALUE)
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)), 1) // simulate that processing takes time
.blockLast();
让我们看看日志:
[ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(1)
[ INFO] (main) | request(1)
[ INFO] (main) | onNext(2)
[ INFO] (parallel-1) | request(1)
[ INFO] (parallel-1) | onNext(3)
[ INFO] (parallel-2) | request(1)
[ INFO] (parallel-2) | onNext(4)
[ INFO] (parallel-3) | request(1)
[ INFO] (parallel-3) | onNext(5)
我们可以看到,在每个 onNext 之前都有一个请求。请求信号由concatMap
操作员发送。它在concatMap
完成当前元素并准备好接受下一个元素时发出信号。源仅在收到来自下游的请求时才发送下一项。
在这个例子中,背压是自动的,我们不需要定义任何策略,因为操作员知道它可以处理什么并且源尊重它。
定义了忽略需求且无背压策略的发布者
为了简单起见,我为本示例选择了一个易于理解的冷发布者。它是Flux.interval每个指定的时间间隔发出一个项目。这个冷发布者不尊重需求是有道理的,因为看到项目以不同的、比最初指定的更长的间隔发出会很奇怪。
让我们看看代码:
Flux.interval(Duration.ofMillis(1))
.log()
.concatMap(x -> Mono.delay(Duration.ofMillis(100)))
.blockLast();
Source 每毫秒发出一个项目。订阅者能够每 100 毫秒处理一项。很明显,订阅者无法跟上生产者的步伐,我们很快就会遇到这样的异常:
reactor.core.Exceptions$OverflowException: Could not emit tick 32 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
...
我们可以做些什么来避免这个异常?
定义了忽略需求和背压策略的发布者
默认的背压策略是我们在上面看到的:以错误终止。Reactor 不会对我们强制执行任何错误处理策略。当我们看到这种错误时,我们可以决定哪一个最适合我们的用例。
您可以在Reactor 参考中找到其中的几个。
对于这个例子,我们将使用最简单的一个:onBackpressureDrop
.
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop()
.concatMap(a -> Mono.delay(Duration.ofMillis(100)).thenReturn(a))
.doOnNext(a -> System.out.println("Element kept by consumer: " + a))
.blockLast();
输出:
Element kept by consumer: 0
Element kept by consumer: 1
Element kept by consumer: 2
Element kept by consumer: 3
Element kept by consumer: 4
Element kept by consumer: 5
Element kept by consumer: 6
Element kept by consumer: 7
Element kept by consumer: 8
Element kept by consumer: 9
Element kept by consumer: 10
Element kept by consumer: 11
Element kept by consumer: 12
Element kept by consumer: 13
Element kept by consumer: 14
Element kept by consumer: 15
Element kept by consumer: 16
Element kept by consumer: 17
Element kept by consumer: 18
Element kept by consumer: 19
Element kept by consumer: 20
Element kept by consumer: 21
Element kept by consumer: 22
Element kept by consumer: 23
Element kept by consumer: 24
Element kept by consumer: 25
Element kept by consumer: 26
Element kept by consumer: 27
Element kept by consumer: 28
Element kept by consumer: 29
Element kept by consumer: 30
Element kept by consumer: 31
Element kept by consumer: 2399
Element kept by consumer: 2400
Element kept by consumer: 2401
Element kept by consumer: 2402
Element kept by consumer: 2403
Element kept by consumer: 2404
Element kept by consumer: 2405
Element kept by consumer: 2406
Element kept by consumer: 2407
我们可以看到在前 32 个项目之后有一个相当大的跳跃到 2400。由于定义的策略,它们之间的元素被删除了。
关键要点
- 背压通常是自动的,我们不需要做任何事情,因为我们按需获取数据。
- 如果来源不尊重订户需求,我们需要定义一个策略来避免终止错误。
更新: 有用的阅读:如何控制请求率
推荐阅读
- javascript - 在客户端运行 python
- python-3.x - How to have my defined refresh function running in the background of my twisted server
- c# - OpenCover C# Branch Coverage 说在 throw new Exception 上应该有一个分支
- angular - 我的 Angular 项目以无法获取开始
- javascript - Why Is My React Application Suddenly Not Working Locally?
- task - 成功创建任务但未将其添加到 Microsoft Dynamics CRM 中的队列的可能原因是什么?
- apache-kafka - Kafka Consumer离开消费组
- go - 按时间戳对 Firebase 查询进行排序似乎返回 0 个结果 [去编程语言]
- html - 关于html+css下拉菜单的问题
- c# - 在 Winforms 中禁用水平自动滚动,而不仅仅是滚动条