首页 > 解决方案 > 在 RxJava doOnNext() 调用中改变值是否安全?

问题描述

我正在使用一个代码库,其中我们有很多类似的模式:

getPoJoObservableFromSomewhere() // no guarantees about the threading/scheduling here
  .doOnNext(pojo -> System.out.println("got pojo: " + pojo.toString())
  .doOnNext(pojo -> pojo.setName("my new pojo"))
  .observeOn(newThreadScheduler)
  .flatMap(pojo -> doSomethingWithPojo(pojo)) // no guarantees about the threading/scheduling in doSomethingWithPojo()
  .subscribe(pojo -> System.out.println("Got a pojo: " + pojo.toString()));

或者更有可能:

Pojo myPojo = new Pojo("old name");
getNameFromSomewhere() // intentionally no guarantee what scheduler or thread this is observed on for this example
   .doOnNext(pojo -> pojo.setName("new name"))
   .flatMap(pojo -> doSomethingWithPojo(pojo)) // again, no guarantees about the threading inside this call
   .subscribe(pojo -> (), error -> System.out.println("Error handling goes here"));

假设 Pojo 对象是没有副作用的纯数据对象,看起来有点像:

class Pojo {
     private String name;
     public Pojo(String name) { this.name = name; }
     public String getName() { return name; }
     public void setName(String name) { this.name = name; }
}

这种模式可以避免细微的线程错误吗?

我能想到的最大障碍是记忆障碍,但我还不足以成为专家,无法忽视可能存在我不知道的事情的想法。

例如,内存屏障是否到位,以便在doOnNext()调用中发生变异后,所有写入都会被提交,并且会被 flatMap 内部发生的任何事情拾取doSomethingWithPojo()

简而言之:这些模式安全吗?

(我们的代码库是 RxJava 1,但这个问题同样适用于 RxJava 2)

标签: javarx-javareactive-programmingrx-java2

解决方案


这取决于上游是否同时发出元素这个 SO展示了一个如何以Subject非线程安全的方式运行的示例。

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);

推荐阅读