首页 > 解决方案 > Firebase + RxJava,onComplete() 事件

问题描述

有谁知道如何将 Firebase 与 RxJava 连接,所以当我从数据库加载所有数据时,它会运行 arrayAdapter.notifyDataSetChanged() ?我正在考虑用 onComplete() 方法编写它,但它仍然在加载所有数据之前运行

        Completable.fromCallable(new Callable<List<cards>>() {
            @Override
            public List<cards> call() throws Exception {
                newUserDb.addListenerForSingleValueEvent(new ValueEventListener() {
                    @Override
                    public void onDataChange(@NonNull DataSnapshot dataSnapshot) {
                        if (dataSnapshot.child(currentUID).child("sex").exists()) {
                            myInfo.put("sex", dataSnapshot.child(currentUID).child("sex").getValue().toString());
                        }
                        if (dataSnapshot.child(currentUID).child("dateOfBirth").exists()) {
                            int myAge = stringDateToAge(dataSnapshot.child(currentUID).child("dateOfBirth").getValue().toString());
                            myInfo.put("age", String.valueOf(myAge));
                        }

                        if (dataSnapshot.child(currentUID).child("connections").child("yes").exists()) {
                            for (DataSnapshot ds : dataSnapshot.child(currentUID).child("connections").child("yes").getChildren()) {
                                if (!dataSnapshot.child(currentUID).child("connections").child("matches").hasChild(ds.getKey())) {
                                    Log.d("rxJava", "onDataChange: " + ds.getKey());
                                    first.add(ds.getKey());
                                    getTagsPreferencesUsers(dataSnapshot.child(ds.getKey()), true);
                                }
                            }
                        }
                    }

                    @Override
                    public void onCancelled(@NonNull DatabaseError databaseError) {

                    }
                });
                return rowItems;
            }
        }).subscribeOn(Schedulers.computation())
                .subscribe(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d("rxJava", "Test RxJAVA, onSubscribe");
                    }

                    @Override
                    public void onComplete() {
                        Log.d("rxJava", "Test RxJAVA, onComplete");
                    }

                    @Override
                    public void onError(Throwable error) {
                        Log.d("rxJava", "Test RxJAVA, onError");
                    }
                });

输出是

2020-06-04 23:36:28.797 29515-29515/com.example.tinderapp D/rxJava: Test RxJAVA, onSubscribe
2020-06-04 23:36:28.800 29515-29612/com.example.tinderapp D/rxJava: Test RxJAVA, onComplete
2020-06-04 23:36:29.018 29515-29515/com.example.tinderapp D/rxJava: onDataChange: a4hqGgAJBRTVJOlPp3blNDt5v7q1
2020-06-04 23:36:29.022 29515-29515/com.example.tinderapp D/rxJava: onDataChange: aA9HAOtaB7ao6vzKqqBNp0iaBev2

标签: javaandroidfirebasefirebase-realtime-databaserx-java

解决方案


我想说,这是预期的行为,如下所述:

Completable.fromCallable

fromCallable 接受一个 Lambda,它返回一个订阅列表。在您的情况下,也打开了一个数据库连接,这基本上是失败的,因为回调是通过回调非阻塞注册的。

订阅

这确保了 fromCallable 中的 subscribeAcutal 是从给定的调度程序中调用的。因此订阅线程和发射线程是解耦的。

您首先获得 onComplete,因为 fromCallable 将立即返回 rowItems 并且数据库连接将保持打开状态,因为您没有删除侦听器。一段时间后,您将获得数据库回调日志,因为数据库连接仍处于打开状态且侦听器仍处于注册状态。

你实际上想要做这样的事情:

    Single.create<List<Card>> { emitter ->
        // register onChange callback to database
        // callback will be called, when a value is available
        // the Single will stay open, until emitter#onSuccess is called with a collected list.
        newUserDb.addListenerForSingleValueEvent {
            // do some stuff

            emitter.onSuccess(listOf()) // return collected data from database here...
        }

        emitter.setCancellable {
            // unregister addListenerForSingleValueEvent from newUserDb here
        }
    }.subscribeOn(Schedulers.computation())
        .subscribe(
            // stuff
        )

如果你想有源源不断的更新,用 Observable/Flowable 交换 Single


推荐阅读