首页 > 解决方案 > Observable 只发出第一个值

问题描述

我正在尝试创建一个从 firebase 查询返回列表的可观察对象。问题是当我调用 onNext 来发射 Item 然后 onComplete 它停止发射第一个项目之后的项目,并且根本不调用 onComplete 什么都不发射。有没有正确的方法来做我想要实现的目标?我RxJava还是新手,所以请原谅我的无知。提前感谢您的任何帮助:)

public Observable<Message> getMessageObservable(String uid) {
    currentUser = auth.getCurrentUser();
    DatabaseReference db_messages = db_root.child("Messages").child(currentUser.getUid())
            .child(uid);
    Query messageQuery = db_messages.orderByKey().limitToLast(10);
    return Observable.create(emitter -> {
        messageQuery.addChildEventListener(new ChildEventListener() {
            @Override
            public void onChildAdded(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {
                String messageText = dataSnapshot.child("message").getValue().toString();
                String messageId = dataSnapshot.child("MessageId").getValue().toString();
                Boolean seen = dataSnapshot.child("seen").getValue(Boolean.class);
                Long timestamp = dataSnapshot.child("timestamp").getValue(long.class);
                String fromUser = dataSnapshot.child("from").getValue().toString();
                String toUser = dataSnapshot.child("to").getValue().toString();
                Message message = new Message(messageText, toUser, messageId, seen, timestamp, null, fromUser);
                emitter.onNext(message);
                emitter.onComplete();
            }

            @Override
            public void onChildChanged(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onChildRemoved(@NonNull DataSnapshot dataSnapshot) {

            }

            @Override
            public void onChildMoved(@NonNull DataSnapshot dataSnapshot, @Nullable String s) {

            }

            @Override
            public void onCancelled(@NonNull DatabaseError databaseError) {

            }
        });
    });
}

@Override
public void getMessages(String userId) {
    currentUser = auth.getCurrentUser();
    Observable.just(userId)
            .flatMap(this::getMessageObservable)
            .toList()
            .subscribe(messages -> {
                chatResults.getMessagesResult(messages);
            });
}

标签: javaandroidfirebasefirebase-realtime-databaserx-java

解决方案


与往常一样,有很多方法可以解决问题。请检查这是否适合您:

  1. 更改getMessageObservable为获取引用、查询和添加ChildEventListener侦听器的简单方法(未创建可观察对象等)
  2. 创建PublishSubject<String> myMessages = PublishSubject.create()pub 主题,订阅它,就像你通常对 observables 所做的那样。在您的订阅中确保收听 onNext 操作 (Action1)
  3. 在你的ChilddEventListenerimpl 中,确保在myMessages.onNext(message)新消息到达时调用

通过上述设置,您现在将收到消息发送到您的 onNext 订阅。您可以保留可变列表并附加(或前置)即将到来的消息,从而通知相关方重新更新消息列表。


推荐阅读