project-reactor - Reactor Sinks:如何控制 onNext() 信号的调用?
问题描述
给定一个重放接收器,是否可以确保在继续执行之前成功向下游发出 onNext() 信号?
显示此问题的一个简单示例是:
@Test
void sinksEmitDelayTest() throws InterruptedException {
Sinks.Many<State> stateSink = Sinks.many().replay().limit(1);
CountDownLatch countDownLatch = new CountDownLatch(1);
stateSink
.asFlux()
.switchMap(state -> Mono
.delay(Duration.ofMillis(1000))
.flatMap(i -> {
if (state.on)
return Mono.error(new Throwable("State must be off"));
else
return Mono.just(state);
})
)
.subscribe(state -> countDownLatch.countDown());
State state = new State();
if (stateSink.tryEmitNext(state) == Sinks.EmitResult.OK)
state.on = true;
else
throw new RuntimeException("failed to emit");
Assertions.assertTrue(countDownLatch.await(2000, TimeUnit.MILLISECONDS));
}
这里的问题是 ,tryEmitNext
实际上并没有调用onNext
,而是在 中添加了一个新Node
的ReplayBuffer
,因此稍后调用onNext
,因此允许在 能够对状态起作用之前切换on
到。true
switchMap
注意:测试甚至没有延迟就失败了。延迟只是为了确保它始终失败。
注意: 可能有任意多的订阅者,在切换到stateSink
时不一定知道。on
true
编辑:
对于实际的静态情况,一种解决方法如下,但是,感觉并不好,看起来也不是真正的线程安全,如果在for
循环等待时添加新订阅者,很可能会失败:
@Test
void sinksEmitDelayTest() throws InterruptedException {
Sinks.Many<State> stateSink = Sinks.many().replay().limit(1);
CountDownLatch countDownLatch = new CountDownLatch(1);
AtomicInteger subscriptionControl = new AtomicInteger(0);
stateSink
.asFlux()
.switchMap(state -> Mono
.delay(Duration.ofMillis(10))
.flatMap(i -> {
if (state.on)
return Mono.error(new Throwable("State must be off"));
else
return Mono.just(state);
})
.doOnNext(next -> subscriptionControl.decrementAndGet())
)
.subscribe(state -> countDownLatch.countDown());
State state = new State();
subscriptionControl.set(stateSink.currentSubscriberCount());
if (stateSink.tryEmitNext(state) == Sinks.EmitResult.OK)
for (; ; ) {
if (subscriptionControl.get() == 0) {
state.on = true;
break;
}
}
else
throw new RuntimeException("failed to emit");
Assertions.assertTrue(countDownLatch.await(2000, TimeUnit.MILLISECONDS));
}
解决方案
推荐阅读
- c - 我正在尝试将第一个单词提取为字符串并将其用作字符
- installation - 如果包大小很大,NSIS Edit 不会提取所有文件和文件夹
- react-native - React Native Image url 作为参数
- php - 尝试使用 PHP 从 2 个数据库中获取数据
- ruby-on-rails - 如何检查日期范围是否在Ruby中的另一个日期范围之间
- php - 如何在 octobercms 后端使用 CURL 获取网站数据?
- blueprism - 从excel中的单元格值获取单元格引用
- css - Bootstrap 4:依赖于纵横比的响应式布局
- twig - 在类别页面上显示产品属性 - Opencart 3
- arrays - 在多个值离子后过滤