首页 > 解决方案 > 根据没有 GroupBy 的条件将 Flux 一分为二

问题描述

对于给定的数据流:

Flux<Integer> evenFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
                         .filter(i -> i % 2 == 0)

Flux<Integer> oddFlux = Flux.just(1, 2, 3, 4, 5, 6, 7)
                         .filter(i -> i % 2 != 0)

相反,我如何使用单个管道进行拆分?所有过滤的元素到一个Flux,丢弃的元素到另一个Flux。使用onDiscardHook什么的?

注意:我需要对repeat()新的Flux. 无法在 上执行重复GroupedFlux

标签: javaproject-reactor

解决方案


这取决于,你有两个选择:

选项 1 是从热输入源创建两个新管道:

ConnectableFlux<Integer> input = Flux.range(1,7).publish();

//Pipeline 1
input.filter(i -> i % 2== 0)
    .subscribe(e -> System.out.println("Even stream value: " + e));

//Pipeline 2
input.filter(i -> i % 2 != 0)
    .subscribe(o -> System.out.println("Odd stream value: " + o));

input.connect();

选项 2 是在单个订阅者中处理结果,例如按以下方式分组:

Flux.just(1,2,3,4,5,6,7)
    .subscribe(n -> {
      if (n % 2 == 0) {
        System.out.println("Even stream value: " + n);
      } else {
        System.out.println("Odd stream value: " + n);
      }
    });

推荐阅读