首页 > 解决方案 > RxJava2 - 使用 PublishSubject 发射项目

问题描述

我有一个场景

subject1: PublishSubjectsubject2:BehaviorSubject

首先,我为 发射单个项目subject1,然后为 发射项目subject2,但在那之后我还想向 发射不同的项目subject1

fun emittingItems() {
    subject1.onNext(functionA1)
    subject2.onNext(functionB)
    if (something) subject1.onNext(functionA2)
}

发生的情况是,我按以下顺序收到一个项目:functionA1, functionA2, functionB

为什么我会出现这种行为?如何按以下顺序发出项目:functionA1, functionB, functionA2.

订阅主题:

val disposable = viewModel.subject1
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(this::someFunction())
disposables.add(disposable)

标签: androidrx-java2subject

解决方案


observeOn(AndroidSchedulers.mainThread())您一起安排主线程上的事件传播。调度本身是顺序的,而每个调度Runnable可能会处理添加到用于它的队列中的多个元素。

这是一种竞争条件,在调用emittingItems()主线程本身时肯定会出现,并且在从任何其他线程调用它时可能会出现。

但是由于您正在处理两个不同的异步流,因此您不能期望在两个不同的观察者中进行任何顺序观察。

您可以通过将两个源合并为一个流来实现给定的目标:

Observable.merge(subject1, subject2)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(subject);

推荐阅读