首页 > 解决方案 > Spring Reactor:这个 Flux 会内存溢出吗?

问题描述

考虑以下代码:

public static void main(String[] args) throws Exception {

        UnicastProcessor<Integer> processor = UnicastProcessor.create();
        Flux<Integer> source = Flux
                .just(1, 2, 3, 4, 5)
                .delayElements(Duration.ofMillis(1))
                .repeat();

        source.subscribe(processor);

        processor.subscribe(i -> {
            System.out.println("i = " + i);
            System.out.println("processor.size() = " + processor.size());
            try {
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        });

        Thread.currentThread().join();

    }

可以看到,source订阅者的延迟远小于订阅者的延迟(休眠 1 秒)。

问题:

  1. 这段代码可能会内存不足吗?
  2. 我总是看到processor.size() = 0。为什么?我希望队列被填满(它是无界的)
  3. 如果我删除delayElements命令,我什么也看不到。为什么?

谢谢!

标签: javareactive-programmingproject-reactor

解决方案


查看create()on的文档UnicastProcessor,它说:

创建一个新的 UnicastProcessor,它将以无限制的方式在内部队列上缓冲

(强调我的。)

具体来说,这意味着如果它的订阅者还没有请求数据,它将尝试缓冲任何可用的数据。它不知道Flux你拥有的数据是无限的,所以它坐在那里缓冲尽可能多的数据。

如果我删除 delayElements 命令,我什么也看不到。为什么?

delayElements()那里调用,这为您的订阅者提供了在开始发送数据之前订阅所需的毫秒数-所以Flux此时它不会尝试缓冲任何数据,只是将其直接发送到 Flux。没有它,Flux订阅者订阅之前会发出值,因此您将获得如上所述的无限缓冲序列。

同样,如果您注释掉repeat()Flux 上的调用,那么它也可以正常工作,因为处理器将缓冲这 5 个值,直到进行 subscribe 调用。

这段代码可能会内存不足吗?

在发生无限缓冲的情况下,是的 - 最终将没有足够的内存供处理器保存更多数据。

我总是看到 processor.size() = 0。为什么?

因为处理器在将其发布给您的订阅者时还没有保存任何值 - 它不需要在那时缓冲它们。如果您size()在处理器尝试无限缓冲这些值时调用它,那么您会看到它不断上升。


推荐阅读