java - 合并热通量源
问题描述
在带有 Reactor 的 Spring Boot 2 中,我试图合并两个Flux
热源。但是,merge
似乎唯一报告Flux
了merge
. 我如何merge
才能识别第二个Flux
。
在下面的示例中,System.err
inB-2
甚至不打印 whenoutgoing1a
是第一个参数。如果我制作outgoing2
第一个,则A-2
不会打印。
以下是完整示例;
package com.example.demo;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();
// Assume Spring @Repository "A-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Repository "B-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"MOS", "TLV"}) {
queue2.add(new Weather(s, d));
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue.take());
System.err.println("1 " + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-1"));
// Assume Spring @Service "B-2" = real-time MOS, TLV
Flux<Weather> outgoing2 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue2.take());
System.err.println("2 " + queue2.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-2"));
// Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1a = Flux.from(outgoing1)
.groupBy(c -> c.city)
.flatMap(g -> g
.sample(Duration.ofSeconds(5))
)
.log("C");
// Assume Spring @Service "C" - merges "A-3" and "B-2"
// only prints outgoing1a
Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println);
// only prints outgoing2
//Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println);
}
}
解决方案
这里有几件事在起作用。
- 请注意运营商的以下建议
.merge
...
请注意,合并适用于异步源或有限源。在处理尚未在专用调度器上发布的无限源时,您必须将该源隔离在其自己的调度器中,否则合并会在订阅另一个源之前尝试将其耗尽。
您的出站 Flux 使用
.publishOn
,但这只会影响链接在操作员之后的.publishOn
操作员。即它不会影响之前的任何内容.publishOn
。具体来说,它不会影响 lambda 中的代码传递给Flux.create
执行的线程。如果您在每个出站通量之前添加,您可以看到这一点.log()
。.publishOn
您的 lambda 传递给
Flux.create
调用一个阻塞方法 (queue.take
)。
由于您subscribe(...)
在线程中调用合并的 Flux,因此main
您的 lambda 传递给Flux.create
在线程中执行main
,并阻止它。
最简单的解决方法是使用.subscribeOn
而不是.publishOn
让传递给的 lambda 中的代码在Flux.create
不同的线程(除了main
)上运行。这将防止main
线程阻塞,并允许两个出站流的合并输出交错。
推荐阅读
- javascript - React 表单上传自动提交,取决于订单
- python-3.x - CSV 写入器写入设置为单行而不是多行
- node.js - 使用打字稿进行WebStorm远程调试...如何?
- gremlin - Gremlin,其中查询不会使用标记顶点中的值
- c# - 在我的计算机上将 VM 客户端连接到服务器 C#
- c# - ComboBox.SelectedValue 返回 ToString() 而不是 Name 属性
- python - 如何在python中绘制具有不同X索引的多条线
- ios - Swift:如何从导航控制器过渡到标签栏控制器
- javascript - 如何使用 axios 和文件 URL 读取文件内容?
- python - 在python中使用步长计算滑动窗口