spring-webflux - 继续 zip(),如果一个源已完成
问题描述
我在使用 .zip() 运算符时遇到了一些问题。
让我用一个小例子来简化我的问题。
Flux<Integer> flux1 = Flux.just(9, 8, 3, -2);
Flux<Integer> flux2 = Flux.just(7);
Flux<Integer> flux3 = Flux.just(6, 5, 4, -4);
List<Flux<Integer>> list1 = Arrays.asList(flux1, flux2, flux3);
TreeSet<Integer> set = new TreeSet<>(Comparator.reverseOrder());
Set<Integer> list = Flux.zip(list1, objects -> {
boolean setChanged = false;
for (Object o : objects) {
Integer i = (Integer) o;
if (set.size() < 5 || i > set.last()) {
setChanged = true;
set.add(i);
if (set.size() > 5) {
set.pollLast();
}
}
}
return setChanged;
}).takeWhile(val -> val)
.then(Mono.just(set))
.block();
System.out.println(list);
在这里,我有 3 个不同的来源(默认情况下它们按降序排序,并且它们的数量可能会更大),我想从它们中获取 5 个按降序排序的元素的集合。不幸的是,我不能只使用 concat() 或 merge() 运算符,因为现实生活中的源可能非常大,但我只需要少量元素。
我期待 [9, 8, 7, 6, 5] 在这里,但其中一个来源是在第一次压缩后完成的。
你能建议我如何解决这个问题吗?
解决方案
你可以试试reduce操作
@Test
void test() {
Flux<Integer> flux1 = Flux.just(9, 8, 3, -2);
Flux<Integer> flux2 = Flux.just(7, 0, -2, 4,3,2,2,1);
Flux<Integer> flux3 = Flux.just(6, 5, 4, -4);
var k = 5;
List<Flux<Integer>> publishers = List.of(flux1, flux2, flux3);
var flux = Flux.merge(publishers)
.log()
.limitRate(2)
.buffer(2)
.reduce((l1, l2) -> {
System.out.println(l1);
System.out.println(l2);
return Stream.concat(
l1.stream(),
l2.stream()
).sorted(Comparator.reverseOrder())
.limit(k)
.collect(Collectors.toList());
})
.log();
StepVerifier.create(flux)
.expectNext(List.of(9,8,7,6,5))
.expectComplete()
.verify();
}
您可以分块获取数据并比较它们以找到前 K 个元素。
在顺序情况下,它将获取一个新批次,将其与当前的前 k 个结果进行比较,并返回一个新的顶层,如上例所示(如果 k 很大,PriorityQueue 可能更适合排序)。如果您使用并行调度程序并且批处理是并行获取的,那么它可以独立地将它们相互比较,这应该会更快一些。您还可以通过 rateLimit、缓冲区、delayElements 等完全控制获取的数据
推荐阅读
- python - Python 3 非阻塞同步行为
- c++ - 程序在运行时输出奇怪的随机值
- python - 如何使用描述和不同的填充颜色数组在散景上绘制散点图?
- c++ - QopenGLWidget paintGL 仅由 resizeGL 触发
- r - flextable 包在没有 <- 的情况下分配更改
- scala - 检查自定义序列中是否存在特定键/值
- python - 如何在 OpenCV2 中将 float32 图片写入视频文件?
- angular - 角度模拟服务器响应数据的更好方法?
- http - 是什么导致 nginx & (node / express) 在从上游读取响应标头时抛出错误:(104: Connection reset by peer)
- javascript - 了解在哪里放置 `await` 关键字