java - Project Reactor:多个发布者发出 HTTP 调用,一个订阅者处理所有结果
问题描述
以下代码的问题在于订阅者只能看到第一个通量的项目(即仅打印1
)。有趣的是,如果我添加delayElements
,它工作正常。
这是一个玩具示例,但我的意图是将其替换为Flux
发出 HTTP GET 请求并发出其结果的 's(也可能超过两个)。
所以重新制定我的问题,我有一个需要实施的多对一关系。考虑到我的情况,如何实施?你会使用某种处理器吗?
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
});
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
尝试使用 TopicProcessor 实现相同的想法,但遇到相同的问题:
public static void main(String[] args) throws Exception {
Flux<Integer> flux1 = Flux.generate(emitter -> {
emitter.next(1);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
Flux<Integer> flux2 = Flux.generate(emitter -> {
emitter.next(2);
try {
Thread.sleep(100);
} catch (Exception e) {}
});
TopicProcessor<Integer> processor = TopicProcessor.create();
flux1.subscribe(processor);
flux2.subscribe(processor);
processor.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
}
解决方案
请注意,合并适用于异步源或有限源。在处理尚未在专用调度器上发布的无限源时,您必须将该源隔离在其自己的调度器中,否则合并会在订阅另一个源之前尝试将其耗尽。
您在这里创建了一个没有专用调度程序的无限源,因此它试图在合并之前完全耗尽该源 - 这就是您遇到问题的原因。
这在您的实际用例中可能不是问题,因为GET
请求的结果可能不会是无限的。但是,如果您想确保结果是交错的,您只需要确保使用自己的调度程序设置每个通量(通过调用subscribeOn(Schedulers.elastic());
每个通量)。
所以你的例子就变成了:
Flux<Integer> flux1 = Flux.<Integer>generate(emitter -> emitter.next(1))
.subscribeOn(Schedulers.elastic());
Flux<Integer> flux2 = Flux.<Integer>generate(emitter -> emitter.next(2))
.subscribeOn(Schedulers.elastic());
Flux<Integer> merged = flux1.mergeWith(flux2);
merged.subscribe(s -> System.out.println(s));
Thread.currentThread().join();
推荐阅读
- visual-studio-2017 - ClickOnce 安装打开失败
- swift4 - Swift 4 如何从字典数组中返回一个较小的数组
- go - 客户端同步中的竞争条件
- jenkins - org.apache.maven.plugins:maven-deploy-plugin:jar:2.7 的 POM 丢失,没有可用的依赖信息
- express - express js空白根页面
- postgresql - 将带有多边形的 CSV 文件插入 PostgreSQL
- mongodb - GridFS put 命令在事务中使用时从 pymongo 挂起
- powershell - 仅当一组值时才添加方括号
- angular - 两种方式数据绑定不适用于Angular 6多选
- python - 无法使用 Selenium 和 Beatifulsoup 在元标记中找到特定文本