首页 > 解决方案 > 使用 LiveData 的 JUnit5 测试不执行订阅者的回调

问题描述

背景:

我有一个简单的应用程序,它使用 rests API 调用来获取电影列表。项目结构如下,

Activity -> ViewModel -> Repository -> ApiService (Retrofit Interface)
  1. Activity 订阅LiveData并监听事件变化

  2. ViewModel托管活动观察到的MediatorLiveData 。最初,ViewModel在MediatorLiveData中设置一个Resource.loading(..)值。

  3. 然后ViewModel调用存储库从ApiService获取电影列表

  4. ApiService返回一个LiveData或_Resource.success(..)Resource.error(..)

  5. 然后ViewModel将来自ApiService的LiveData结果合并到MediatorLiveData

我的查询:

在单元测试中,只有第一个发射Resource.loading(..)是由ViewModelMediatorLiveData发出的。MediatorLiveData永远不会从存储库中 发出任何数据。

ViewModel.class

private var discoverMovieLiveData: MediatorLiveData<Resource<DiscoverMovieResponse>> = MediatorLiveData()

fun observeDiscoverMovie(): LiveData<Resource<DiscoverMovieResponse>> {
        return discoverMovieLiveData
    }

fun fetchDiscoverMovies(page: Int) {

        discoverMovieLiveData.value = Resource.loading(null) // this emit get observed immediately 

        val source = movieRepository.fetchDiscoverMovies(page)
        discoverMovieLiveData.addSource(source) {
            discoverMovieLiveData.value = it // never gets called
            discoverMovieLiveData.removeSource(source)
        }
    } 

存储库.class

fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
        return LiveDataReactiveStreams.fromPublisher(
            apiService.fetchDiscoverMovies(page)
                .subscribeOn(Schedulers.io())
                .map { d ->
                    Resource.success(d) // never gets called in unit test
                }
                .onErrorReturn { e ->
                    Resource.error(ApiErrorHandler.getErrorByThrowable(e), null) // // never gets called in unit test
                }
        )
    }

单元测试

@Test
fun loadMovieListFromNetwork() {
        val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10)
        val call: Flowable<DiscoverMovieResponse> = successCall(mockResponse) // wraps the retrofit result inside a Flowable<DiscoverMovieResponse>
        whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)

        viewModel.fetchDiscoverMovies(1)

        verify(apiService).fetchDiscoverMovies(1)
        verifyNoMoreInteractions(apiService)

        val liveData = viewModel.observeDiscoverMovie()
        val observer: Observer<Resource<DiscoverMovieResponse>> = mock()
        liveData.observeForever(observer)

        verify(observer).onChanged(
            Resource.success(mockResponse) // TEST FAILS HERE AND GETS "Resource.loading(null)" 
        )
    }

Resource是一个通用的包装类,用于包装不同场景的数据,例如加载、成功、错误。

class Resource<out T>(val status: Status, val data: T?, val message: String?) {
.......
}

编辑:#1

出于测试目的,我更新了存储库中的 rx 线程以在主线程上运行它。这最终会导致Looper not mocked异常。

fun fetchDiscoverMovies(page: Int): LiveData<Resource<DiscoverMovieResponse>> {
            return LiveDataReactiveStreams.fromPublisher(
                apiService.fetchDiscoverMovies(page)
                    .subscribeOn(AndroidSchedulers.mainThread())
                    .map {...}
                    .onErrorReturn {...}
            )
        }

在测试课上,

@ExtendWith(InstantExecutorExtension::class)
class MainViewModelTest {

    companion object {
        @ClassRule
        @JvmField
        val schedulers = RxImmediateSchedulerRule()
    }

    @Test
        fun loadMovieListFromNetwork() {
        .....  
       }
}

}

RxImmediateSchedulerRule.class

class RxImmediateSchedulerRule : TestRule {

    private val immediate = object : Scheduler() {
        override fun createWorker(): Worker {
            return ExecutorScheduler.ExecutorWorker(Executor { it.run() })
        }
    }

    override fun apply(base: Statement, description: Description): Statement {
        return object : Statement() {
            @Throws(Throwable::class)
            override fun evaluate() {
                RxJavaPlugins.setInitIoSchedulerHandler { immediate }
                RxJavaPlugins.setInitComputationSchedulerHandler { immediate }
                RxJavaPlugins.setInitNewThreadSchedulerHandler { immediate }
                RxJavaPlugins.setInitSingleSchedulerHandler { immediate }
                RxAndroidPlugins.setInitMainThreadSchedulerHandler { immediate }

                try {
                    base.evaluate()
                } finally {
                    RxJavaPlugins.reset()
                    RxAndroidPlugins.reset()
                }
            }
        }
    }

}

InstantExecutorExtension.class

class InstantExecutorExtension : BeforeEachCallback, AfterEachCallback {

    override fun beforeEach(context: ExtensionContext?) {
        ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
            override fun executeOnDiskIO(runnable: Runnable) {
                runnable.run()
            }

            override fun postToMainThread(runnable: Runnable) {
                runnable.run()
            }

            override fun isMainThread(): Boolean {
                return true
            }
        })
    }

    override fun afterEach(context: ExtensionContext?) {
        ArchTaskExecutor.getInstance().setDelegate(null)
    }

}

标签: androidjunitmockitoandroid-livedataandroid-viewmodel

解决方案


您指定的方式RxImmediateSchedulerRule不适用于 JUnit5。如果你在apply()方法中放置一个断点,你会看到它没有被执行。

相反,您应该创建一个此处指定的扩展:


    class TestSchedulerExtension : BeforeTestExecutionCallback, AfterTestExecutionCallback {

        override fun beforeTestExecution(context: ExtensionContext?) {
            RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }
            RxJavaPlugins.setComputationSchedulerHandler { Schedulers.trampoline() }
            RxJavaPlugins.setNewThreadSchedulerHandler { Schedulers.trampoline() }
            RxAndroidPlugins.setMainThreadSchedulerHandler { Schedulers.trampoline() }
        }

        override fun afterTestExecution(context: ExtensionContext?) {
            RxJavaPlugins.reset()
            RxAndroidPlugins.reset()
        }

    }

然后TestSchedulerExtension在测试类的注释中应用如下:


    @ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
    class MainViewModelTest {

        private val apiService: ApiService = mock()
        private lateinit var movieRepository: MovieRepository
        private lateinit var viewModel: MainViewModel

        @BeforeEach
        fun init() {
            movieRepository = MovieRepository(apiService)
            viewModel = MainViewModel(movieRepository)
        }

        @Test
        fun loadMovieListFromNetwork() {
            val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
            val call: Flowable = Flowable.just(mockResponse)
            whenever(apiService.fetchDiscoverMovies(1)).thenReturn(call)

            viewModel.fetchDiscoverMovies(1)

            assertEquals(Resource.success(mockResponse), LiveDataTestUtil.getValue(viewModel.discoverMovieLiveData))
        }

    }

现在测试将通过。现在你已经测试过了,那个观察者已经被发送了期望值。


从另一个角度:这是单元测试吗?肯定不是,因为在这个测试中,我们正在与 2 个单元交互:MainViewModelMovieRepository. 这更符合“集成测试”的术语。如果你嘲笑了MoviesRepository,那么这将是一个有效的单元测试:


@ExtendWith(value = [InstantExecutorExtension::class, TestSchedulerExtension::class])
class MainViewModelTest {

    private val movieRepository: MovieRepository = mock()
    private val viewModel = MainViewModel(movieRepository)

    @Test
    fun loadMovieListFromNetwork() {
        val mockResponse = DiscoverMovieResponse(1, emptyList(), 100, 10, 0, "", false)
        val liveData =
            MutableLiveData>().apply { value = Resource.success(mockResponse) }
        whenever(movieRepository.fetchDiscoverMovies(1)).thenReturn(liveData)

        viewModel.fetchDiscoverMovies(1)

        assertEquals(Resource.success(mockResponse), getValue(viewModel.discoverMovieLiveData))
    }

}

注意,MovieRepository应该open与 with 一起声明fetchDiscoverMovies()以便能够模拟它。或者,您可以考虑使用kotlin-allopen插件。


推荐阅读