首页 > 解决方案 > Rxjava:当 switchmap 发生时发出减少值

问题描述

reduce运算符在可观察对象的末尾(完成时)发出值。

我正在寻找一种reduceswitchmap. 当外部可观察值发出值或完成时,我想要无限的内部可观察值sum

@Test
public void emit_value_when_switchmap() throws InterruptedException {

    Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
            .switchMapMaybe(
                    l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                            .reduce(Long::sum)
                            .map(a -> a + ": Final")
            )
            .subscribe(e -> System.out.println(e));


    Thread.sleep(10000);
}

此图说明了想要的行为:

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  

标签: rx-javarx-java2

解决方案


这可能不是最好的方法,但它现在可以完成工作,直到有人想出一种更奇特的方法来解决您的用例。

请看一下我的测试,我认为它可以解决您的问题:

环境:(gradle——groovy)

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"

测试 3 的发射是从可观察的源进行的。每次发出一个新值时,都会订阅内部的 observable。当发出一个新值时,内部可观察对象完成并将该值推送到下游。然后将通过订阅新的内部流来处理新发出的值。

  @Test
  public void takeWhileReduce() {
    TestScheduler scheduler = new TestScheduler();
    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Long> publish = source.publish(
        multicast -> {
          return multicast.flatMap(
              o -> {
                return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                    .takeUntil(multicast)
                    .reduce(Long::sum)
                    .toObservable();
              },
              1);
        });

    TestObserver<Long> test = publish.test();

    source.onNext(42);

    scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);
    // assert - values emitted: 0,1,2,3
    test.assertValuesOnly(6L);

    // next value is flatMapped
    scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1,2
    test.assertValuesOnly(6L, 3L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1
    test.assertValuesOnly(6L, 3L, 1L);
  }

推荐阅读