首页 > 解决方案 > 为什么我在单元测试中的 RxJava 订阅丢失而没有被处置

问题描述

我正在尝试为基于 RxJava 的视图模型编写单元测试。在生产中它运行良好,但在我的单元测试中,我的一个订阅在没有 dispose 或任何被调用的情况下丢失了。

我正在使用 MVI 模式。出于简单的原因,所有类(Intent、Action、Result、ViewState)都只有一个文本。

public class ViewModel {
    private static final String TAG = ViewModel.class.getSimpleName();
    private final ISchedulerProvider schedulerProvider;
    private Observable<ViewState> viewState;
    private PublishSubject<Intent> intentEmitter = PublishSubject.create();
    private IActionProcessHolder<Action, Result> actionProcessHolder;

    public ViewModel(ISchedulerProvider schedulerProvider, IActionProcessHolder actionProcessHolder){
        this.schedulerProvider = schedulerProvider;
        this.actionProcessHolder = actionProcessHolder;
        viewState = compose();
    }

    private Observable<ViewState> compose() {
        return intentEmitter
                .compose(getIntentToActionTransformer())
                .compose(actionProcessHolder.getActionProcessor())
                .map(result -> new ViewState(result.getText()));
    }

    public void processIntents(Observable<Intent> intent){
        intent
                .subscribeOn(schedulerProvider.io())
                .observeOn(schedulerProvider.io())
                .subscribe(
                        anIntent -> intentEmitter.onNext(anIntent),
                        error -> Log.e(TAG, error.getMessage())
                );
    }

    public Observable<ViewState> getViewState(){
        return viewState;
    }

    private ObservableTransformer<Intent, Action> getIntentToActionTransformer(){
        return intent ->
                intent.map(theIntent -> new Action(theIntent.getText()));
    }
}
public class ATestScheduler implements ISchedulerProvider {

    @Override
    public Scheduler computation() { return Schedulers.trampoline(); }

    @Override
    public Scheduler io() { return Schedulers.trampoline(); }

    @Override
    public Scheduler ui() { return Schedulers.trampoline(); }
}
public class Intent {
    String text;
    public Intent(String text){ this.text = text; }
    public String getText(){ return text; }
}
public class ActionProcessHolder implements IActionProcessHolder<Action,Result>{
    @Override
    public ObservableTransformer<Action,Result> getActionProcessor(){
        return action -> action.map(theAction -> new Result(theAction.getText()));
    }
}
public class ViewModelTest {

    @Mock
    private ObservableTransformer<Action, Result> actionProcessor;
    @Mock
    private IActionProcessHolder<Action, Result> actionProcessHolder;
    private TestObserver<ViewState> testObserver;
    private ViewModel viewModel;
    private CompositeDisposable subscriptions = new CompositeDisposable();

    @Before
    public void setup(){
        MockitoAnnotations.initMocks(this);
        ISchedulerProvider schedulerProvider = new ATestScheduler();
        testObserver = new TestObserver<>();

        when(actionProcessHolder.getActionProcessor()).thenReturn(actionProcessor);
        when(actionProcessor.apply(any())).thenReturn(Observable.just(new Result("mocked text")));

        viewModel = new ViewModel(schedulerProvider, actionProcessHolder);
        subscriptions.add(viewModel.getViewState()
            .observeOn(schedulerProvider.ui())
            .subscribe(
                    viewState -> testObserver.onNext(viewState)
                    , error -> Assert.fail(error.getMessage())));
    }

    @Test
    public void testProcessIntent(){
        viewModel.processIntents(Observable.just(new Intent("newText")));
        testObserver.assertValueAt(0, viewState -> viewState.getText().equals("mocked text"));
        verify(actionProcessor, times(2)).apply(any());
    }

我期望 actionProcessor 被调用两次,一次是在初始化期间,一次是在我在实际测试中处理另一个意图时。然而,当 Intent 被处理时,视图模型中的 intentEmitter 不再有任何订阅者,所以 onNext 什么也不做。

我已经计算出 ViewModel 类中的行:

.compose(actionProcessHolder.getActionProcessor())

似乎打破它。当我简单地将我的操作映射到这样的结果时:

.map(action -> new Result(action.getText()))

(与 actionProcessor 应该做的一样)订阅没有被释放。我在订阅被处理的任何地方都设置了断点,但没有一个被调用过。有谁知道为什么会这样?

标签: javaunit-testingrx-java2

解决方案


问题实际上不是订阅被处理,而是订阅链从未建立。在设置期间,当调用 subscribe 时,使用此行

when(actionProcessor.apply(any())).thenReturn(Observable.just(new Result("mocked text")));

Mockito 立即导致 Observable 发出它的值并中断了订阅过程。我现在通过在不使用 Mockito 的情况下模拟 actionProcessor 来解决它,只需将上面的行替换为:

actionProcessor = action -> action.map(ignore -> new Result("mocked text"));


推荐阅读