rx-java - 订阅者的 onComplete 方法是否由 Android 上的 Room 在后台线程上调用?
问题描述
我正在使用 Room 实现一个 Android 应用程序。目前,我已经将 Guave 和 Jetpack 库包含在我的项目中以供其他部分使用。但是,如果这有助于解决我的问题,我不介意包含另一个库(即 JxJava)。但是,我还没有为我的问题找到任何权威文档。
我需要对数据执行异步、可观察的查询,并且观察处理程序必须在后台线程上运行,因为我必须对结果进行一些昂贵的后处理。
对于 Java,有两种选择如何与房间进行异步交互:使用 Rxjava 或 Guava/LiveData(请参阅编写异步 DAO 查询 - 语言和框架选项)
如果我想要一个异步的一次性(相对于可观察的)查询并采用 Guava/LiveData,那么 API 返回一个ListenableFuture
(请参阅编写异步 DAO 查询 - 编写异步一次性查询)。将ListenableFuture
aRunnable
作为侦听器关联到一个Executator
用于在数据更改时调度侦听器的侦听器(请参阅ListenableFuture#addListener)。这很棒,因为我的应用程序中已经有一个中央ThreadPool
执行器用于后台任务。但是,这是我不想要的一次性查询。
与 Guava/LiveData 结合的异步可观察LiveData
查询返回一个(请参阅编写异步 DAO 查询 - 编写异步可观察查询。这很遗憾,因为 a 的方法onChange
总是在主(GUI)线程上执行。这是一个设计的原则,因为他们应该更新 UI,但这不是我需要的。当然,我可以使用主线程上的方法,在我的后台执行程序上调度一个并再次跳到后台,但这似乎涉及背景和主线程之间不必要的上下文切换。Observer
LiveData
LiveData
onChange
Runnable
所以我正在考虑使用 RxJava。与 RxJava 结合使用的异步、可观察Flowable
查询返回一个. 可以使用 aConsumer
订阅Flowable
. 但是,当发出新值时,我没有找到任何有关分派accept
-method 的线程的信息。根据 RxJava 文档,这是由创建者负责的,因为 RxJava 只定义了抽象接口,但没有规定特定的实现。在手头的情况下, 的创建者是 Room 库,但似乎没有记录,Room 使用哪个线程用于.Consumer
Flowable
Flowable
Flowable
Flowable
Room 是否使用主线程来更新其Flowable
? (那会很糟糕,而且没有改善LiveData
)。Room 是否使用与Flowable
数据库查询相同的后台线程来更新它?(这不会完全糟糕,但仍然可以改进。)或者 Room 是否分叉了一个仅用于更新其的新线程Flowable
?(好的)
奖励:为了减少分叉和销毁线程的数量,如果 Room 不仅可以将我的应用程序范围用于查询和所有其他异步任务,我将不胜ThreadPoolExecutor
感激Flowables
。
解决方案
在文档中找不到的东西,总是可以在源代码中找到:D
假设这样的查询:
@Query("SELECT * FROM table")
fun query(): Flowable<List<Entity>>
Room 的注释处理器将生成一些代码,其中将包含以下内容:
return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}
检查此方法的实现:
Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.subscribeOn(scheduler)
.unsubscribeOn(scheduler)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});
所以他们使用一个调度器,它是从执行器创建的。您可以在此处找到有关此执行器的一些信息androidx.room.RoomDatabase.Builder#setQueryExecutor
:
* When both the query executor and transaction executor are unset, then a default
* {@code Executor} will be used. The default {@code Executor} allocates and shares threads
* amongst Architecture Components libraries. If the query executor is unset but a
* transaction executor was set, then the same {@code Executor} will be used for queries.
* <p>
* For best performance the given {@code Executor} should be bounded (max number of threads
* is limited).
例子
以下代码
dao.query().subscribe({
Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
}, {
Log.d("CheckThread1", "onError ${Thread.currentThread().name}")
})
将导致:
D/CheckThread1: onSuccess arch_disk_io_0
如您所见,此操作不是在主线程上处理的。
是的,您可以在构建数据库时设置自己的执行程序:
Room.databaseBuilder(
context.applicationContext,
Database::class.java,
"main.db"
)
.setTransactionExecutor(.../*your executor*/)
.setQueryExecutor(.../*your executor*/)
.build()
推荐阅读
- c++ - 如何解决 C++ 中不明确的函数选择?
- javascript - 为 Bootstrap 主题运行 npm i gulp-cli g 时权限被拒绝
- c# - 创建具有多个键的字典并使用其中一个键获取值
- python - 在 ipywidgets 的交互中处理 pandas 时,我可以为每个项目设置一个层次结构吗?
- c++ - 如何在编辑控件中添加 Enter/Return 键的功能
- ruby-on-rails - 获取 RVM 路径后,在 Docker 容器中找不到 Gem
- excel - .在错误的列中查找搜索(vba)(固定)
- angular - Angular:为嵌套属性赋值
- list - 在haskell中切换列表中的每个元素
- java - 如何在分层地图中正确使用 flatMap
?