首页 > 解决方案 > 订阅者的 onComplete 方法是否由 Android 上的 Room 在后台线程上调用?

问题描述

我正在使用 Room 实现一个 Android 应用程序。目前,我已经将 Guave 和 Jetpack 库包含在我的项目中以供其他部分使用。但是,如果这有助于解决我的问题,我不介意包含另一个库(即 JxJava)。但是,我还没有为我的问题找到任何权威文档。

我需要对数据执行异步、可观察的查询,并且观察处理程序必须在后台线程上运行,因为我必须对结果进行一些昂贵的后处理。

对于 Java,有两种选择如何与房间进行异步交互:使用 Rxjava 或 Guava/LiveData(请参阅编写异步 DAO 查询 - 语言和框架选项

如果我想要一个异步的一次性(相对于可观察的)查询并采用 Guava/LiveData,那么 API 返回一个ListenableFuture(请参阅编写异步 DAO 查询 - 编写异步一次性查询)。将ListenableFutureaRunnable作为侦听器关联到一个Executator用于在数据更改时调度侦听器的侦听器(请参阅ListenableFuture#addListener)。这很棒,因为我的应用程序中已经有一个中央ThreadPool执行器用于后台任务。但是,这是我不想要的一次性查询。

与 Guava/LiveData 结合的异步可观察LiveData查询返回一个(请参阅编写异步 DAO 查询 - 编写异步可观察查询。这很遗憾,因为 a 的方法onChange总是在主(GUI)线程上执行。这是一个设计的原则,因为他们应该更新 UI,但这不是我需要的。当然,我可以使用主线程上的方法,在我的后台执行程序上调度一个并再次跳到后台,但这似乎涉及背景和主线程之间不必要的上下文切换。ObserverLiveDataLiveDataonChangeRunnable

所以我正在考虑使用 RxJava。与 RxJava 结合使用的异步、可观察Flowable查询返回一个. 可以使用 aConsumer订阅Flowable. 但是,当发出新值时,我没有找到任何有关分派accept-method 的线程的信息。根据 RxJava 文档,这是由创建者负责的,因为 RxJava 只定义了抽象接口,但没有规定特定的实现。在手头的情况下, 的创建者是 Room 库,但似乎没有记录,Room 使用哪个线程用于.ConsumerFlowableFlowableFlowableFlowable

Room 是否使用主线程来更新其Flowable? (那会很糟糕,而且没有改善LiveData)。Room 是否使用与Flowable数据库查询相同的后台线程来更新它?(这不会完全糟糕,但仍然可以改进。)或者 Room 是否分叉了一个仅用于更新其的新线程Flowable?(好的)

奖励:为了减少分叉和销毁线程的数量,如果 Room 不仅可以将我的应用程序范围用于查询和所有其他异步任务,我将不胜ThreadPoolExecutor感激Flowables

标签: rx-javaandroid-roomandroid-livedatathreadpoolexecutor

解决方案


在文档中找不到的东西,总是可以在源代码中找到:D

假设这样的查询:

@Query("SELECT * FROM table")
fun query(): Flowable<List<Entity>>

Room 的注释处理器将生成一些代码,其中将包含以下内容:

 return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}

检查此方法的实现:

Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
    final Maybe<T> maybe = Maybe.fromCallable(callable);
    return createFlowable(database, tableNames)
            .subscribeOn(scheduler)
            .unsubscribeOn(scheduler)
            .observeOn(scheduler)
            .flatMapMaybe(new Function<Object, MaybeSource<T>>() {
                @Override
                public MaybeSource<T> apply(Object o) throws Exception {
                    return maybe;
                }
            });

所以他们使用一个调度器,它是从执行器创建的。您可以在此处找到有关此执行器的一些信息androidx.room.RoomDatabase.Builder#setQueryExecutor

     * When both the query executor and transaction executor are unset, then a default
     * {@code Executor} will be used. The default {@code Executor} allocates and shares threads
     * amongst Architecture Components libraries. If the query executor is unset but a
     * transaction executor was set, then the same {@code Executor} will be used for queries.
     * <p>
     * For best performance the given {@code Executor} should be bounded (max number of threads
     * is limited).

例子

以下代码

    dao.query().subscribe({
        Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
    }, {
        Log.d("CheckThread1", "onError  ${Thread.currentThread().name}")
    })

    

将导致:

D/CheckThread1: onSuccess arch_disk_io_0

如您所见,此操作不是在主线程上处理的。

是的,您可以在构建数据库时设置自己的执行程序:

Room.databaseBuilder(
                context.applicationContext,
                Database::class.java,
                "main.db"
            )
                .setTransactionExecutor(.../*your executor*/)
                .setQueryExecutor(.../*your executor*/)
                .build()

推荐阅读