java - 在 RxJava doOnNext() 调用中改变值是否安全?
问题描述
我正在使用一个代码库,其中我们有很多类似的模式:
getPoJoObservableFromSomewhere() // no guarantees about the threading/scheduling here
.doOnNext(pojo -> System.out.println("got pojo: " + pojo.toString())
.doOnNext(pojo -> pojo.setName("my new pojo"))
.observeOn(newThreadScheduler)
.flatMap(pojo -> doSomethingWithPojo(pojo)) // no guarantees about the threading/scheduling in doSomethingWithPojo()
.subscribe(pojo -> System.out.println("Got a pojo: " + pojo.toString()));
或者更有可能:
Pojo myPojo = new Pojo("old name");
getNameFromSomewhere() // intentionally no guarantee what scheduler or thread this is observed on for this example
.doOnNext(pojo -> pojo.setName("new name"))
.flatMap(pojo -> doSomethingWithPojo(pojo)) // again, no guarantees about the threading inside this call
.subscribe(pojo -> (), error -> System.out.println("Error handling goes here"));
假设 Pojo 对象是没有副作用的纯数据对象,看起来有点像:
class Pojo {
private String name;
public Pojo(String name) { this.name = name; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
}
这种模式可以避免细微的线程错误吗?
我能想到的最大障碍是记忆障碍,但我还不足以成为专家,无法忽视可能存在我不知道的事情的想法。
例如,内存屏障是否到位,以便在doOnNext()
调用中发生变异后,所有写入都会被提交,并且会被 flatMap 内部发生的任何事情拾取doSomethingWithPojo()
?
简而言之:这些模式安全吗?
(我们的代码库是 RxJava 1,但这个问题同样适用于 RxJava 2)
解决方案
这取决于上游是否同时发出元素。这个 SO展示了一个如何以Subject
非线程安全的方式运行的示例。
AtomicInteger counter = new AtomicInteger(); // Thread-safe // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized(); // Not Thread Safe PublishSubject<Object> subject = PublishSubject.create(); Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter); Consumer<Integer> sleep = (s) -> { try { Thread.sleep(s); } catch (InterruptedException e) { e.printStackTrace(); } }; subject .doOnNext(i -> counter.incrementAndGet()) .doOnNext(i -> counter.decrementAndGet()) .doOnNext(print) .filter(i -> counter.get() != 0) .doOnNext(i -> { throw new NullPointerException("Concurrency detected"); } ) .subscribe(); Runnable r = () -> { for (int i = 0; i < 100000; i++) { sleep.accept(1); subject.onNext(i); } }; ExecutorService pool = Executors.newFixedThreadPool(2); pool.execute(r); pool.execute(r);
推荐阅读
- javascript - 如何在不刷新页面的情况下重置输入值
- python - 如何设计数据提供者
- php - 从 PHP 调用 Powershell 脚本
- python - 我们可以对从预训练模型中提取的特征进行归一化吗
- r - 较长的对象长度不是较短对象长度的倍数 [错误]
- c# - 无法从子窗体调用 ParametizedThread
- r - 在 R 中将行名称从一个数据帧附加到另一个具有不同维度的数据帧
- python - 检查目标时出错:预期 activation_5 的形状为 (1,) 但得到的数组的形状为 (2,)
- c - 在链表中使用 malloc 后的 Free()
- android - 如何在 RecyclerView Kotlin 中添加 Header