android - 链式调用android中Rxjava中的异常(RxJavaAssemblyException:组装)
问题描述
我正在尝试在 RxJava 中进行调用。我可能在通话中做了太多的连锁反应。有太多的 Observable 和转换,恐怕组装丢失了。不幸的是,我需要使用这些功能。我在这里有一个方法,我在我的片段中调用它。当我调试并设置断点时,该方法永远不会被调用。
在我的片段中,我在这里调用了 offlineItems():
private fun streamDownloads(): Observable<Unit> {
return downloadsDataRepository.offlineItems()
.observeOn(AndroidSchedulers.mainThread())
.map { downloadLoaded(it) } // Exception here.
}
不调用 downloadLoaded(it),有时它会返回异常。我担心我在 RxJava 中映射了过于复杂的链式调用。这是offlineItems() 调用。
fun offlineItems(): Observable<List<MediaItem>> {
val list = getAllMyDownloadedMediaItems()
return list.flatMapIterable { it }
.flatMap { mediaStore.getMediaItemWithId(MediaId(it.request.id)) } // this method is called .
.toList() // returns an Observable<MediaItem>
.toObservable()
}
为了彻底, mediaStoreCall 是:
override fun getMediaItemWithId(mediaId: MediaId): Observable<MediaItem> {
return if (isOnline()) {
Observable.just(Unit)
.effectMap { fetchAndStoreRemote(mediaId) }
.flatMap { mediaItemFromDB(mediaId) }
} else {
mediaItemFromDB(mediaId)
}
}
我得到的列表(从 exoplayer 下载):
fun getAllMyDownloadedMediaItems(): Observable<List<Download>> {
return Observable.just(downloadManager.downloadIndex.getDownloads(Download.STATE_COMPLETED).use { index ->
mutableListOf<Download>().apply {
if (index.isFirst || index.moveToFirst()) {
do {
add(index.download)
} while (index.moveToNext())
}
}
})
}
从上述方法调用的片段中下载的方法:
private fun downloadLoaded(downloads: List<MediaItem>) {
if (downloads.isEmpty()) {
downloadStateView.setState(StateView.State.EMPTY)
} else {
downloadStateView.setState(StateView.State.CONTENT)
episodeAdapter.items = listOf(Header(downloads.size)) + downloads.map(::Item)
}
}
我在片段的 onResume 中调用 streamDownloads。
override fun onResume() {
super.onResume()
Observable.merge(
streamDownloads(),
handleEmptyAction()
).autoDispose(this)
.subscribe()
}
我绝不是 RxJava 专家,所以如果有人能指出我做错了什么。
编辑 堆栈跟踪:
RxJavaAssemblyException: assembled
at dalvik.system.VMStack.getThreadStackTrace(Native Method)
at io.reactivex.Observable.map(Observable.java:9781)
at DownloadsFragment.streamDownloads(DownloadsFragment.kt:143)
at DownloadsFragment.onResume(DownloadsFragment.kt:128)
at androidx.fragment.app.Fragment.performResume(Fragment.java:2649)
at androidx.fragment.app.FragmentManagerImpl.moveToState
at FragmentManagerImpl.moveFragmentToExpectedState
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:1303)
at FragmentManagerImpl.dispatchStateChange(FragmentManagerImpl.java:2659)
at FragmentManagerImpl.dispatchResume(FragmentManagerImpl.java:2625)
at androidx.fragment.app.Fragment.performResume(Fragment.java:2658)
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:922)
at FragmentManagerImpl.moveFragmentToExpectedState
at FragmentManagerImpl.moveToState(FragmentManagerImpl.java:1303)
at FragmentManagerImpl.executeOpsTogether(FragmentManagerImpl.java:1884)
at FragmentManagerImpl.removeRedundantOperationsAndExecute
at FragmentManagerImpl.execPendingActions(FragmentManagerImpl.java:1727)
at FragmentManagerImpl$2.run(FragmentManagerImpl.java:150)
at android.os.Handler.handleCallback(Handler.java:873)
at android.os.Handler.dispatchMessage(Handler.java:99)
at android.os.Looper.loop(Looper.java:193)
at android.app.ActivityThread.main(ActivityThread.java:6669)
at RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
解决方案
推荐阅读
- reactjs - 使用 useImperativeHandle 控制子状态是一种不好的做法吗?
- python - IndexError:索引 n 超出轴 1 的范围,大小为 n
- postgresql - 如何使用外部客户端提供的任意 JSONB 查询字符串防止 SQL 注入?
- java - 如何回答这个Java面试问题:解释服务器如何知道文件上传完成
- c# - 如何让一段代码每 x 秒重复一次?
- typescript - geofirestore 不返回单个文档数据
- python - 来自 MATLAB 的“wavedec3”的 Python 等价物
- python - 如何将锚标签 ID 传递给 Django 中的 url?
- python - 多处理 Python Kafka 消费者客户端没有收到消息
- java - 为什么我不能编辑我的 JButton 的位置?