rx-java - Rxjava:当 switchmap 发生时发出减少值
问题描述
reduce
运算符在可观察对象的末尾(完成时)发出值。
我正在寻找一种reduce
在switchmap
. 当外部可观察值发出值或完成时,我想要无限的内部可观察值。sum
@Test
public void emit_value_when_switchmap() throws InterruptedException {
Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
.switchMapMaybe(
l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.reduce(Long::sum)
.map(a -> a + ": Final")
)
.subscribe(e -> System.out.println(e));
Thread.sleep(10000);
}
此图说明了想要的行为:
//events: --------x-----1----2---1---x-----3--0--------x-1---1----|
//result: ---------------------------4-----------------3----------2
解决方案
这可能不是最好的方法,但它现在可以完成工作,直到有人想出一种更奇特的方法来解决您的用例。
请看一下我的测试,我认为它可以解决您的问题:
环境:(gradle——groovy)
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"
测试 3 的发射是从可观察的源进行的。每次发出一个新值时,都会订阅内部的 observable。当发出一个新值时,内部可观察对象完成并将该值推送到下游。然后将通过订阅新的内部流来处理新发出的值。
@Test
public void takeWhileReduce() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> source = PublishSubject.create();
Observable<Long> publish = source.publish(
multicast -> {
return multicast.flatMap(
o -> {
return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
.takeUntil(multicast)
.reduce(Long::sum)
.toObservable();
},
1);
});
TestObserver<Long> test = publish.test();
source.onNext(42);
scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2,3
test.assertValuesOnly(6L);
// next value is flatMapped
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2
test.assertValuesOnly(6L, 3L);
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1
test.assertValuesOnly(6L, 3L, 1L);
}
推荐阅读
- javascript - 单击提交按钮时滚动到角度反应形式的错误字段
- javascript - 无法显示 p5 JavaScript 背景图片
- regex - vim 替换 : # 用于单个 $ 而如果 $$ 则无事可做
- python - 在 python 上写入 CSV 文件的问题
- python - PHP加密和解密函数到python
- javascript - 来自现有数组的表单中的 React Dropdown
- powershell - Microsoft Teams:将成员从一个 Teams 组迁移到另一个
- php - 使用 handler.postDelayed (Android) 创建实时应用程序?
- jmeter - 在高斯分布中生成http请求
- python - 使用 python 从 FQDN 获取 url