首页 > 解决方案 > 如何在 Java Reactor 中设置完全背压驱动的通量?

问题描述

我有一种情况,我需要多个工人(比如说 2 个)。工作人员必须执行消耗上游事件的任务。

手头的任务消耗一个事件列表,并且具有独立于列表大小的恒定时间。

因此,我希望上游在请求时才提供一个包含所有缓冲事件的列表,一次 1 个列表。

可悲的是,大多数方法都实现了预取。发生的情况是,即使使用 limitRate(1, 0)upstream 接收一个 too many onRequest(1),只是为了补充下游缓冲区。

因此,我很难在工人可用时才生成缓冲列表:它们通常是提前生成的,没有达到我最大化缓冲列表大小的目标。

我怎样才能实现这样的设置?

有没有办法完全禁用预取?

标签: javareactive-programmingproject-reactor

解决方案


不确定我是否正确理解了这个问题。显示您当前正在执行的操作的示例代码会有所帮助。

在 an 之前不从源中提取数据的一种方法onRequestdefer实例化 Flux。所以你的代码看起来像:

Flux source = Flux.defer(() -> getFluxForUpstreamSource());

另一种使用背压从源消费的方法是使用Flux.generate。您的代码将类似于:

Flux source = Flux.generate(
        UpstreamSource::getConnection,
        (connection, sink) -> {
            try {
                sink.next(connection.getNext());
            } catch (UpstreamException e) {
                sink.error(e);
            }
            return connection;
        }
);

推荐阅读