rx-java2 - 处置 Disposable 是否保证将来没有电话?
问题描述
处理Disposable
退货是否Observable#subscribe(Consumer)
保证Consumer
不会收到进一步的电话accept
?如果没有,我怎样才能获得这样的保证?
如果有所不同,Observable
则会观察到 并将Disposable
设置在同一个单线程Scheduler
中。
解决方案
处理从 Observable#subscribe(Consumer) 返回的 Disposable 是否保证 Consumer 不会收到进一步的接受调用?
没有这样的硬保证。RxJava 2 尽最大努力在异步取消时停止发射,但发出取消的线程可能会稍微延迟,以至于从外部角度来看,一个或多个项目仍然漏掉了。
如果没有,我怎样才能获得这样的保证?
您可以使事情变得更加急切和相互排斥,但在异步世界中,通常很难分辨首先发生了什么或在一个动作期间发生了什么。
如果有所不同,则观察 Observable 并将 Disposable 放置在同一个单线程调度程序中。
如果取消是按顺序发生的,例如 using take(2)
,同步连接的上游将停止发送项目:range(1, 5).take(2)
永远不会尝试发出3, 4, 5
。然而,
range(1, 5)
.map(v -> v + 1)
.observeOn(io())
.take(2)
.subscribe()
可能在 take 甚至看到 value 之前运行1..5
并执行映射2
。
根据您拥有的流程类型,将任务发送到调度程序以在该调度程序上处理正在运行的序列可能永远不会执行,因为它被当前发出的任务阻塞,因此以下不会停止序列:
var single = Schedulers.single();
var dispose = single.scheduleDirect(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Processing...");
}
});
// this will not execute
single.scheduleDirect(() -> dispose.dispose());
// but this will stop the processing
dispose.dispose();
Thread.sleep(1000);
推荐阅读
- python - 调用 keras 自定义指标进行预测
- sql-server - 罗马尼亚字符“ă”在 SQL Server 中等于“a”
- python - 如何使用多个列表执行计算?
- spring-boot - 由于多个感叹号,SpringBoot 无法从 Runnable jar 加载属性文件
- docker - 如何为以下命令创建 docker 文件
- go - gofpdf - 居中对齐文本之间的粗体文本
- javascript - 将 Angular 书面扩展迁移到清单 V3
- javascript - 我的 React 应用程序从不加载 localhost:3000 它只是一直在旋转(认为 React-Router-Dom 是问题所在)
- c - Openssl 1.1.1 库在自签名签名上出现错误
- python - 如何使用 RL 代码中的嵌套 if 向量化 for 循环?