首页 > 解决方案 > 处置 Disposable 是否保证将来没有电话?

问题描述

处理Disposable退货是否Observable#subscribe(Consumer)保证Consumer不会收到进一步的电话accept?如果没有,我怎样才能获得这样的保证?

如果有所不同,Observable则会观察到 并将Disposable设置在同一个单线程Scheduler中。

标签: rx-java2

解决方案


处理从 Observable#subscribe(Consumer) 返回的 Disposable 是否保证 Consumer 不会收到进一步的接受调用?

没有这样的硬保证。RxJava 2 尽最大努力在异步取消时停止发射,但发出取消的线程可能会稍微延迟,以至于从外部角度来看,一个或多个项目仍然漏掉了。

如果没有,我怎样才能获得这样的保证?

您可以使事情变得更加急切和相互排斥,但在异步世界中,通常很难分辨首先发生了什么或在一个动作期间发生了什么。

如果有所不同,则观察 Observable 并将 Disposable 放置在同一个单线程调度程序中。

如果取消是按顺序发生的,例如 using take(2),同步连接的上游将停止发送项目:range(1, 5).take(2)永远不会尝试发出3, 4, 5。然而,

range(1, 5)
.map(v -> v + 1)
.observeOn(io())
.take(2)
.subscribe()

可能在 take 甚至看到 value 之前运行1..5并执行映射2

根据您拥有的流程类型,将任务发送到调度程序以在该调度程序上处理正在运行的序列可能永远不会执行,因为它被当前发出的任务阻塞,因此以下不会停止序列:

var single = Schedulers.single();

var dispose = single.scheduleDirect(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        System.out.println("Processing...");
    }
});

// this will not execute
single.scheduleDirect(() -> dispose.dispose());

// but this will stop the processing
dispose.dispose();

Thread.sleep(1000);

推荐阅读