android - 对于从数据库中检索到的每条记录,RxJava 流重复两次
问题描述
我有一个 RxJava 流,它一次遍历一个表中的所有记录,为每条记录中的每个收件人发送一封电子邮件,并存储发送是否成功的状态。这个循环会不断重复,直到从数据库中读取所有收件人并将消息发送给他们。我遇到的问题是,从流的开头(从 getMessagesToSend 开始)到 storeMessageSent 的代码被每个收件人调用两次(第一个收件人除外):
val msgToSendPublisher = BehaviorSubject.createDefault(0)
msgToSendPublisher.flatMap { startPos -> App.context.repository.getMessageToSend() }
.flatMap { messageToSend ->
App.context.repository.sendMessage(messageToSend)
.doOnError {
messageToSend.failureSending = true
}.doOnNext { Observable.just(messageToSend) }
}
.zipWith( // 1 second delay between emissions.
Observable.interval(1, TimeUnit.SECONDS),
BiFunction { item: MessageToSend, _: Long -> item })
.flatMap { messageToSend ->
App.context.repository.storeMessageSent(messageToSend)
.doOnError {
messageToSend.failureSending = true
}.doOnNext { Observable.just(messageToSend) }
}
.doOnNext { messageToSend ->
msgToSendPublisher.onNext(0)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ messageToSend ->
},
{ ex ->
if (ex is EmptyResultSetException) {
} else {
}
},
{
// Complete
}
)
}
我的直播显然有问题。仅当调用 msgToSendPublisher.onNext(0) 时,流才应前进到下一条记录。
是什么导致流为每条记录重复两次?
解决方案
推荐阅读
- vue.js - 未知的自定义元素:
验证 - c# - 在第一场比赛后停止 Regex positive Lookbehind
- javascript - 为什么在异步函数中捕获后仍然抛出异常?
- python - 在 python 中使用“with”语句自动关闭 pyplot.figure
- amazon-s3 - Cloudfront 和 s3 gzip 给出相互矛盾的结果
- dictionary - Go 中的哪些语义规则决定何时发生单值赋值与何时发生二值赋值?
- react-native - 如何在世博相机中使用构造函数?
- python - 在 django 管理表单中处理计算的后保存方法
- python - 如何让程序告诉你正在打印哪一行?
- c# - Microsoft.Azure.KeyVault.Models.KeyVaultErrorException:'操作返回了无效的状态代码'BadRequest''