java - Firebase + RxJava,onComplete() 事件
问题描述
有谁知道如何将 Firebase 与 RxJava 连接,所以当我从数据库加载所有数据时,它会运行 arrayAdapter.notifyDataSetChanged() ?我正在考虑用 onComplete() 方法编写它,但它仍然在加载所有数据之前运行
Completable.fromCallable(new Callable<List<cards>>() {
@Override
public List<cards> call() throws Exception {
newUserDb.addListenerForSingleValueEvent(new ValueEventListener() {
@Override
public void onDataChange(@NonNull DataSnapshot dataSnapshot) {
if (dataSnapshot.child(currentUID).child("sex").exists()) {
myInfo.put("sex", dataSnapshot.child(currentUID).child("sex").getValue().toString());
}
if (dataSnapshot.child(currentUID).child("dateOfBirth").exists()) {
int myAge = stringDateToAge(dataSnapshot.child(currentUID).child("dateOfBirth").getValue().toString());
myInfo.put("age", String.valueOf(myAge));
}
if (dataSnapshot.child(currentUID).child("connections").child("yes").exists()) {
for (DataSnapshot ds : dataSnapshot.child(currentUID).child("connections").child("yes").getChildren()) {
if (!dataSnapshot.child(currentUID).child("connections").child("matches").hasChild(ds.getKey())) {
Log.d("rxJava", "onDataChange: " + ds.getKey());
first.add(ds.getKey());
getTagsPreferencesUsers(dataSnapshot.child(ds.getKey()), true);
}
}
}
}
@Override
public void onCancelled(@NonNull DatabaseError databaseError) {
}
});
return rowItems;
}
}).subscribeOn(Schedulers.computation())
.subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
Log.d("rxJava", "Test RxJAVA, onSubscribe");
}
@Override
public void onComplete() {
Log.d("rxJava", "Test RxJAVA, onComplete");
}
@Override
public void onError(Throwable error) {
Log.d("rxJava", "Test RxJAVA, onError");
}
});
输出是
2020-06-04 23:36:28.797 29515-29515/com.example.tinderapp D/rxJava: Test RxJAVA, onSubscribe
2020-06-04 23:36:28.800 29515-29612/com.example.tinderapp D/rxJava: Test RxJAVA, onComplete
2020-06-04 23:36:29.018 29515-29515/com.example.tinderapp D/rxJava: onDataChange: a4hqGgAJBRTVJOlPp3blNDt5v7q1
2020-06-04 23:36:29.022 29515-29515/com.example.tinderapp D/rxJava: onDataChange: aA9HAOtaB7ao6vzKqqBNp0iaBev2
解决方案
我想说,这是预期的行为,如下所述:
Completable.fromCallable
fromCallable 接受一个 Lambda,它返回一个订阅列表。在您的情况下,也打开了一个数据库连接,这基本上是失败的,因为回调是通过回调非阻塞注册的。
订阅
这确保了 fromCallable 中的 subscribeAcutal 是从给定的调度程序中调用的。因此订阅线程和发射线程是解耦的。
您首先获得 onComplete,因为 fromCallable 将立即返回 rowItems 并且数据库连接将保持打开状态,因为您没有删除侦听器。一段时间后,您将获得数据库回调日志,因为数据库连接仍处于打开状态且侦听器仍处于注册状态。
你实际上想要做这样的事情:
Single.create<List<Card>> { emitter ->
// register onChange callback to database
// callback will be called, when a value is available
// the Single will stay open, until emitter#onSuccess is called with a collected list.
newUserDb.addListenerForSingleValueEvent {
// do some stuff
emitter.onSuccess(listOf()) // return collected data from database here...
}
emitter.setCancellable {
// unregister addListenerForSingleValueEvent from newUserDb here
}
}.subscribeOn(Schedulers.computation())
.subscribe(
// stuff
)
如果你想有源源不断的更新,用 Observable/Flowable 交换 Single
推荐阅读
- javascript - 如何在更新 asp.net 中的行时更改 GridView 的特定输入字段值
- swift - 这个错误是什么意思:错误:创建传输失败:noSupportedTransportAvailable
- rust - 迭代向量元素的组合并操作元素
- html - 在下创建下拉列表时出错在角度 7
- javascript - 如何从 AJAX responseText 中获取价值
- sql - 从大量表中检索数据的最有效方法 (20-30)
- angular - 如何在页面上播放音频加载或角度 6+ 中的每个设置间隔
- css - Flexbox - 顶部对齐内容
- c++ - Static member function and access operator
- javascript - 为什么我不能使用和箭头函数生成 {object}?