首页 > 解决方案 > 继续 zip(),如果一个源已完成

问题描述

我在使用 .zip() 运算符时遇到了一些问题。

让我用一个小例子来简化我的问题。

    Flux<Integer> flux1 = Flux.just(9, 8, 3, -2);
    Flux<Integer> flux2 = Flux.just(7);
    Flux<Integer> flux3 = Flux.just(6, 5, 4, -4);

    List<Flux<Integer>> list1 = Arrays.asList(flux1, flux2, flux3);

    TreeSet<Integer> set = new TreeSet<>(Comparator.reverseOrder());

    Set<Integer> list = Flux.zip(list1, objects -> {
        boolean setChanged = false;
        for (Object o : objects) {
            Integer i = (Integer) o;

            if (set.size() < 5 || i > set.last()) {
                setChanged = true;
                set.add(i);
                if (set.size() > 5) {
                    set.pollLast();
                }
            }
        }

        return setChanged;
    }).takeWhile(val -> val)
            .then(Mono.just(set))
            .block();

    System.out.println(list);

在这里,我有 3 个不同的来源(默认情况下它们按降序排序,并且它们的数量可能会更大),我想从它们中获取 5 个按降序排序的元素的集合。不幸的是,我不能只使用 concat() 或 merge() 运算符,因为现实生活中的源可能非常大,但我只需要少量元素。

我期待 [9, 8, 7, 6, 5] 在这里,但其中一个来源是在第一次压缩后完成的。

你能建议我如何解决这个问题吗?

标签: spring-webfluxproject-reactorfluxreactor

解决方案


你可以试试reduce操作

    @Test
    void test() {
        Flux<Integer> flux1 = Flux.just(9, 8, 3, -2);
        Flux<Integer> flux2 = Flux.just(7, 0, -2, 4,3,2,2,1);
        Flux<Integer> flux3 = Flux.just(6, 5, 4, -4);

        var k = 5;
        List<Flux<Integer>> publishers = List.of(flux1, flux2, flux3);
        var flux = Flux.merge(publishers)
                .log()
                .limitRate(2)
                .buffer(2)
                .reduce((l1, l2) -> {
                    System.out.println(l1);
                    System.out.println(l2);

                    return Stream.concat(
                            l1.stream(),
                            l2.stream()
                    ).sorted(Comparator.reverseOrder())
                    .limit(k)
                    .collect(Collectors.toList());
                })
                .log();

        StepVerifier.create(flux)
                .expectNext(List.of(9,8,7,6,5))
                .expectComplete()
                .verify();
    }

您可以分块获取数据并比较它们以找到前 K 个元素。

在顺序情况下,它将获取一个新批次,将其与当前的前 k 个结果进行比较,并返回一个新的顶层,如上例所示(如果 k 很大,PriorityQueue 可能更适合排序)。如果您使用并行调度程序并且批处理是并行获取的,那么它可以独立地将它们相互比较,这应该会更快一些。您还可以通过 rateLimit、缓冲区、delayElements 等完全控制获取的数据


推荐阅读