java - Spring Reactor:这个 Flux 会内存溢出吗?
问题描述
考虑以下代码:
public static void main(String[] args) throws Exception {
UnicastProcessor<Integer> processor = UnicastProcessor.create();
Flux<Integer> source = Flux
.just(1, 2, 3, 4, 5)
.delayElements(Duration.ofMillis(1))
.repeat();
source.subscribe(processor);
processor.subscribe(i -> {
System.out.println("i = " + i);
System.out.println("processor.size() = " + processor.size());
try {
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
});
Thread.currentThread().join();
}
可以看到,source
订阅者的延迟远小于订阅者的延迟(休眠 1 秒)。
问题:
- 这段代码可能会内存不足吗?
- 我总是看到
processor.size() = 0
。为什么?我希望队列被填满(它是无界的) - 如果我删除
delayElements
命令,我什么也看不到。为什么?
谢谢!
解决方案
查看create()
on的文档UnicastProcessor
,它说:
创建一个新的 UnicastProcessor,它将以无限制的方式在内部队列上缓冲。
(强调我的。)
具体来说,这意味着如果它的订阅者还没有请求数据,它将尝试缓冲任何可用的数据。它不知道Flux
你拥有的数据是无限的,所以它坐在那里缓冲尽可能多的数据。
如果我删除 delayElements 命令,我什么也看不到。为什么?
在delayElements()
那里调用,这为您的订阅者提供了在开始发送数据之前订阅所需的毫秒数-所以Flux
此时它不会尝试缓冲任何数据,只是将其直接发送到 Flux。没有它,Flux
订阅者订阅之前会发出值,因此您将获得如上所述的无限缓冲序列。
同样,如果您注释掉repeat()
Flux 上的调用,那么它也可以正常工作,因为处理器将缓冲这 5 个值,直到进行 subscribe 调用。
这段代码可能会内存不足吗?
在发生无限缓冲的情况下,是的 - 最终将没有足够的内存供处理器保存更多数据。
我总是看到 processor.size() = 0。为什么?
因为处理器在将其发布给您的订阅者时还没有保存任何值 - 它不需要在那时缓冲它们。如果您size()
在处理器尝试无限缓冲这些值时调用它,那么您会看到它不断上升。
推荐阅读
- r - 如何在 r 中绘制 2 条时间序列线及其趋势线
- r - Pheatmap:如何只保留树状图
- javascript - 当我尝试在我的代码中使用它来检查变量时,if 语句不起作用
- python - 使用交叉验证 XGboost 时出现 Windows 错误 0xe06d7363
- python - 在 CMake 命令 ModuleNotFoundError 中调用 pipenv
- html - Div 宽度:在 IE11 中初始表单加载期间 100% 不工作,在其他浏览器中工作
- reactjs - 为什么状态不会在 React 功能组件中更新
- java - 在 Android Auto Backup 中解密“传输被拒绝的包,因为它当时无法处理它”错误
- android - 尝试连接 api 时 onResponse 不成功
- javascript - 使用 2 种形式将数据从烧瓶发送到 html