首页 > 解决方案 > 带有 Autoupdate、copyFromRealm、RxJava 的领域

问题描述

我正在使用 Realm 的 rxjava 绑定 toFlowable 在项目更新时收到通知。我必须在 UI 线程中执行此操作才能从领域获取更新通知。另一方面,我需要使结果无法管理以在线程之间传递它们并避免不一致。

public Flowable<List<City>> getLiveCityList() {
    return mRealm.where(City.class)
            .findAll()
            .asFlowable()
            .map(mRealm::copyFromRealm)
            .subscribeOn(AndroidSchedulers.mainThread());
}

例如,我正在使用 diffUtils 更新 recyclerview 的项目,并且需要获取两个不同的列表(不同的参考)。我也必须在 UI 线程中执行 copyFromRealm,因为 RealmResults 只能在它们创建的线程中使用。

DiffUtil.DiffResult diffResult = DiffUtil.calculateDiff(new DiffUtilCallback(mItems, newItems), true);
mItems = newItems;
diffResult.dispatchUpdatesTo(BaseRecyclerViewAdapter.this);

我应该将所有这些从 UI 线程中移出吗?我怎样才能做到这一点?

标签: androidandroid-recyclerviewrealmrx-java2

解决方案


首先,您需要创建一个 HandlerThread 并用 AndroidSchedulers 包装它以将其与 Rx 一起用作调度程序:

    SchedulerHolder looperScheduler = new SchedulerHolder(); // holds scheduler instance
    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler.setScheduler(AndroidSchedulers.from(handlerThread.getLooper()));
    }

然后,您现在可以将其用作subscribeOn().unsubscribeOn()在后台线程上运行的 RealmQuery:

private interface QuerySelector<E extends RealmModel> {
    RealmResults<E> createQuery(Realm realm);
}

private <T extends RealmModel> Observable<List<T>> createResults(QuerySelector<T> querySelector) {
    return Observable.create((ObservableOnSubscribe<List<T>>) emitter -> {
        Realm realm = Realm.getDefaultInstance(); // can use custom config too
        final RealmResults<T> results = querySelector.createQuery(realm);
        final RealmChangeListener<RealmResults<T>> realmChangeListener = element -> {
            if(element.isLoaded() && !emitter.isDisposed()) {
                List<R> list = new ArrayList<>(element.size());
                Collections.addAll(list, realm.copyFromRealm(element));
                if(!emitter.isDisposed()) {
                    emitter.onNext(list);
                }
            }
        };
        emitter.setDisposable(Disposables.fromAction(() -> {
            if(results.isValid()) {
                results.removeChangeListener(realmChangeListener);
            }
            realm.close();
        }));
        results.addChangeListener(realmChangeListener);
        if(results.isLoaded()) {
             emitter.onNext(realm.copyFromRealm(results));
        }    
      }) 
     .subscribeOn(looperScheduler.getScheduler())
     .unsubscribeOn(looperScheduler.getScheduler());
}

然后你可以调用它

public Observable<List<City>> getCities() {
    return createResults((realm) -> realm.where(City.class).findAllAsync());
}

不要忘记说observeOn(AndroidSchedulers.mainThread())某个时间。


推荐阅读