首页 > 解决方案 > 对于从数据库中检索到的每条记录,RxJava 流重复两次

问题描述

我有一个 RxJava 流,它一次遍历一个表中的所有记录,为每条记录中的每个收件人发送一封电子邮件,并存储发送是否成功的状态。这个循环会不断重复,直到从数据库中读取所有收件人并将消息发送给他们。我遇到的问题是,从流的开头(从 getMessagesToSend 开始)到 storeMessageSent 的代码被每个收件人调用两次(第一个收件人除外):

val msgToSendPublisher = BehaviorSubject.createDefault(0)

msgToSendPublisher.flatMap { startPos -> App.context.repository.getMessageToSend() }
    .flatMap { messageToSend ->
        App.context.repository.sendMessage(messageToSend)
            .doOnError {
                messageToSend.failureSending = true
            }.doOnNext { Observable.just(messageToSend) }
    }
    .zipWith( // 1 second delay between emissions.
        Observable.interval(1, TimeUnit.SECONDS),
        BiFunction { item: MessageToSend, _: Long -> item })
    .flatMap { messageToSend ->
        App.context.repository.storeMessageSent(messageToSend)
            .doOnError {
                messageToSend.failureSending = true
            }.doOnNext { Observable.just(messageToSend) }
    }
    .doOnNext { messageToSend ->
        msgToSendPublisher.onNext(0)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { messageToSend ->
        },
        { ex ->
            if (ex is EmptyResultSetException) {

            } else {
            }
        },
        {
            // Complete
        }
    )
    }

我的直播显然有问题。仅当调用 msgToSendPublisher.onNext(0) 时,流才应前进到下一条记录。

是什么导致流为每条记录重复两次?

标签: androidkotlinrx-java

解决方案


推荐阅读